view_object.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import logging
  2. import pathlib as pl
  3. import shutil
  4. import sys
  5. import tempfile
  6. import time
  7. from pkg_resources import get_distribution
  8. from .flags import FlagsManager
  9. from view.python_core.gdm_generation import get_roi_gdm_traces_dict, get_gdm_file
  10. from .measurement_list import MeasurementList
  11. from .measurement_list.importers import get_setup_extension
  12. from .movies import export_movie
  13. from .overviews import generate_overview_image, generate_overview_image_for_output
  14. from .overviews.ctv_handlers import get_ctv_handler
  15. from .p1_class import get_empty_p1, get_p1
  16. from .rois.roi_io import get_roi_io_class
  17. import gc
  18. class VIEW(object):
  19. def __init__(self, flags=None, terminal_output_verbose=True):
  20. """
  21. Initializes a VIEW object with default flags
  22. """
  23. self.flags = FlagsManager()
  24. if flags is not None:
  25. self.flags.update_flags({"STG_MotherOfAllFolders": flags["STG_MotherOfAllFolders"]})
  26. self.flags.update_flags({k: v for k, v in flags.items() if k not in ["STG_MotherOfAllFolders"]})
  27. self.flags.update_flags({"VIEW_batchmode": True})
  28. self.measurement_list = None
  29. self.p1 = None
  30. self.log_file = self.setup_logging(terminal_output_verbose)
  31. logging.getLogger("VIEW").info(
  32. f"VIEW object initialized for offline use. Version: {get_distribution('view').version}")
  33. def __del__(self):
  34. del self.flags
  35. self.delete_data()
  36. def delete_data(self):
  37. try:
  38. self.p1.__del__()
  39. del self.measurement_list
  40. except AttributeError as ae:
  41. pass
  42. self.p1 = None
  43. self.measurement_list = None
  44. gc.collect()
  45. def setup_logging(self, terminal_output_verbose):
  46. my_logger = logging.getLogger("VIEW")
  47. my_logger.propagate = False
  48. my_logger.setLevel(level=logging.INFO)
  49. if not my_logger.hasHandlers():
  50. temp_dir = tempfile.gettempdir()
  51. view_log_dir_path = pl.Path(temp_dir) / "VIEW_logs"
  52. view_log_dir_path.mkdir(exist_ok=True)
  53. log_file_path = view_log_dir_path / f"VIEW_started_at_{time.strftime('%Y-%m-%d-%H-%M-%S')}.log"
  54. formatter = logging.Formatter("%(asctime)s [VIEW] [%(levelname)-5.5s] %(message)s")
  55. file_handler = logging.FileHandler(log_file_path)
  56. file_handler.setLevel(level=logging.INFO)
  57. file_handler.setFormatter(formatter)
  58. my_logger.addHandler(file_handler)
  59. if terminal_output_verbose:
  60. stream_handler = logging.StreamHandler(sys.stdout)
  61. stream_handler.setLevel(level=logging.DEBUG)
  62. stream_handler.setFormatter(formatter)
  63. my_logger.addHandler(stream_handler)
  64. log_file = str(log_file_path)
  65. else:
  66. file_handler = [x for x in my_logger.handlers if isinstance(x, logging.FileHandler)][0]
  67. log_file = file_handler.baseFilename
  68. return log_file
  69. def update_flags(self, flags):
  70. """
  71. Updates VIEW flags with <flags>
  72. :param flags: dict, keys are flag names, values are flag values
  73. :return: None
  74. """
  75. self.flags.update_flags(flags)
  76. def update_flags_from_ymlfile(self, yml_filename):
  77. """
  78. Updates VIEW flags with flags from YML file
  79. :param yml_filename: str, path on file system to a YML flags file
  80. :return: None
  81. """
  82. self.flags.read_flags_from_yml(yml_filename)
  83. self.flags.update_flags({"VIEW_batchmode": True})
  84. def initialize_animal(self, animal):
  85. """
  86. Initializes the flag "STG_ReportTag" to <animal>. Tries to find and load the measurement list file
  87. for the animal <animal>. Raises FileNotFoundError if not found.
  88. :param animal: string, name/tag of the animal
  89. """
  90. self.flags.update_flags({
  91. "STG_ReportTag": animal
  92. })
  93. lst_file = self.flags.get_existing_lst_file()
  94. if lst_file is None:
  95. raise FileNotFoundError(f"Could not find a list file for animal={animal} "
  96. f"in {self.flags['STG_OdorInfoPath']}")
  97. self.measurement_list = MeasurementList.create_from_lst_file(
  98. lst_fle=lst_file, LE_loadExp=self.flags["LE_loadExp"])
  99. def initialize_animal_from_list_file(self, list_file):
  100. """
  101. Extracts the stem of the list file name and uses it as animal name ("STG_ReportTag"). Initializes the specified
  102. animal list
  103. :param list_file:
  104. """
  105. self.measurement_list = MeasurementList.create_from_lst_file(
  106. lst_fle=list_file, LE_loadExp=self.flags["LE_loadExp"])
  107. assert self.measurement_list.animal_name is not None, "Something went wrong!"
  108. self.flags.update_flags({"STG_ReportTag": self.measurement_list.animal_name})
  109. def get_current_animal(self):
  110. self.check_if_animal_is_initialized()
  111. return self.flags['STG_ReportTag']
  112. def check_if_animal_is_initialized(self):
  113. """
  114. Raises an error if no animal has been initialized
  115. """
  116. if self.measurement_list is not None:
  117. measurement_list_name = pl.Path(self.measurement_list.last_measurement_list_fle).name
  118. if measurement_list_name.startswith(f"{self.flags['STG_ReportTag']}."):
  119. return # all good
  120. raise ValueError("No animal initialized, "
  121. "please initialize first VIEW with an animal using the method 'initialize_animal'")
  122. def get_measus_for_current_animal(self, analyze_values_to_use=None):
  123. """
  124. Returns a list of "measu" values for the currently initialized animal, for which the corresponding entry in the
  125. column 'Analyze' is one among <analyze_to_use>. If <analyze_values_accepted> is None,
  126. then all measus for the animal are returned
  127. :return: list of int
  128. """
  129. self.check_if_animal_is_initialized()
  130. return self.measurement_list.get_measus(analyze_values_accepted=analyze_values_to_use)
  131. def get_measu_label_for_current_animal(self, measu):
  132. """
  133. Returns the measurement label for the measurement corresponding to <measu> based of current flag settings.
  134. :raises: AssertionError if animal is not initialized
  135. :return: str, measurent label
  136. """
  137. self.check_if_animal_is_initialized()
  138. return self.flags.get_measurement_label(measurement_row=self.measurement_list.get_row_by_measu(measu))
  139. def load_measurement_data_from_current_animal(self, measu):
  140. """
  141. Loads the measurement from current animal with the specified <measu>
  142. :param measu: int
  143. :return: str, label of the measurement as specified by the flag "LE_labelColumns"
  144. """
  145. self.check_if_animal_is_initialized()
  146. self.flags.update_flags({"STG_Measu": measu})
  147. measu_label, self.p1 = self.measurement_list.load_data(flags=self.flags, measu=measu)
  148. return measu_label
  149. def calculate_signals(self):
  150. """
  151. Calculates signals using the raw data currently loaded and using current flag values. Raises an ValueError if
  152. no raw data has been loaded
  153. :returns: None
  154. """
  155. if self.p1 is not None:
  156. self.p1.calculate_signals(self.flags)
  157. else:
  158. raise ValueError("No raw data has been loaded. Load some raw data and try calculating signals again!")
  159. def load_measurement_data(self, animal, measu):
  160. """
  161. Loads the measurement with the specified <measu> for the specified animal.
  162. :param animal: str, name of the animal, (usually name of the list file/vws file without extension)
  163. :param measu: int
  164. :return: str, label of the measurement as specified by the flag "LE_labelColumns"
  165. """
  166. # self.measurement_list can be not None only if an animal had been initialized and
  167. # when an animal has been initialized, flag 'STG_ReportTag' gets set
  168. if not(self.measurement_list is not None and self.flags["STG_ReportTag"] == animal):
  169. self.initialize_animal(animal)
  170. return self.load_measurement_data_from_current_animal(measu)
  171. def load_measurement_data_without_list_file(
  172. self, raw_data_files, sampling_rate, LE_loadExp, animal='unspecified_animal'):
  173. """
  174. Load data into VIEW directly from raw data files without needing measurement list files
  175. :param sequence raw_data_files: list of raw data files. Must be compatible with the flag `LE_loadExp`
  176. :param float sampling_rate: in Hz, number of frames measured per second
  177. :param LE_loadExp: value of the flag of the same name, please see its documentation
  178. :param str animal: optional animal tag
  179. """
  180. self.flags.update_flags({"LE_loadExp": LE_loadExp})
  181. self.p1 = get_empty_p1(LE_loadExp=self.flags["LE_loadExp"])
  182. # needed for looking if a usable area file exists
  183. self.flags.update_flags({'STG_ReportTag': animal})
  184. self.p1.load_without_metadata(filenames=raw_data_files, flags=self.flags, sampling_rate=sampling_rate)
  185. def export_movie_for_current_measurement(self):
  186. """
  187. Export a movie with the current flag settings and for the measurement data currently loaded
  188. :return: str, path of the movie output file written or output directory for when mv_exportFormat='single_tif'
  189. """
  190. if self.p1.sig1 is None:
  191. self.calculate_signals()
  192. movie_dir_path = pl.Path(self.flags.get_op_movie_dir())
  193. movie_dir_path.mkdir(parents=True, exist_ok=True)
  194. measurement_row = self.measurement_list.get_row_by_measu(self.flags["STG_Measu"])
  195. user_spec_label = self.flags.get_measurement_label(measurement_row)
  196. op_filepath_stem = str(movie_dir_path / user_spec_label)
  197. if self.p1.sig1 is None:
  198. self.calculate_signals()
  199. return export_movie(flags=self.flags, p1=self.p1,
  200. full_filename_without_extension=op_filepath_stem)
  201. def get_roi_info_for_current_animal(self):
  202. """
  203. Reads and returns ROI information for the current animal
  204. :return: roi_data_dict, roi_file
  205. roi_data_dict: dict with roi labels as keys and roi data objects as values
  206. roi_file: str, file from which ROI information was taken
  207. """
  208. roi_data_dict, roi_file = get_roi_io_class(self.flags["RM_ROITrace"]).read(flags=self.flags)
  209. return roi_data_dict, roi_file
  210. def get_gdm_file_for_current_measurement(self, roi_data_dict=None):
  211. """
  212. Creates and returns a GDMFile object containing a list of GDMRow object, each of which contains metadata and
  213. the time trace associated with a ROI.
  214. If <roi_data_dict> is None, ROIs are interpreted using the flags RM_ROITrace
  215. :param dict roi_data_dict: dict with roi labels as keys and roi data objects as values
  216. :return: roi_data_dict, gdm_file
  217. roi_data_dict: a dictionary with ROI labels as keys and ROI data objects as values
  218. gdm_file: view.python_core.gdm_generation.gdm_data_classes.GDMFile object
  219. """
  220. roi_label_gdm_traces_dict, roi_data_dict = self.get_roi_gdm_traces_dict(roi_data_dict)
  221. gdm_file = get_gdm_file(p1=self.p1, flags=self.flags)
  222. return gdm_file, roi_data_dict
  223. def get_roi_gdm_traces_dict(self, roi_data_dict=None):
  224. """
  225. Returns a dictionary of roi labels and corresponding time traces as numpy arrays.
  226. If <roi_data_dict> is None, ROIs are interpreted using the flags RM_ROITrace
  227. :param dict roi_data_dict: dict with roi labels as keys and roi data objects as values
  228. :return: roi_data_dict, gdm_file
  229. roi_data_dict: a dictionary with ROI labels as keys and ROI data objects as values
  230. roi_label_gdm_traces_dict: a dictionary with ROI labels as keys and corresponding time traces as values
  231. """
  232. if self.p1.sig1 is None:
  233. self.calculate_signals()
  234. if roi_data_dict is None:
  235. roi_data_dict, roi_file = get_roi_io_class(self.flags["RM_ROITrace"]).read(
  236. flags=self.flags, measurement_label=self.p1.metadata.ex_name)
  237. roi_label_gdm_traces_dict = get_roi_gdm_traces_dict(p1=self.p1, flags=self.flags, roi_data_dict=roi_data_dict)
  238. return roi_label_gdm_traces_dict, roi_data_dict
  239. def generate_ctv_response_frame_for_current_measurement(self):
  240. """
  241. Reduce 3D signal into 2D response frame by applying CTV function as defined by flags
  242. :return: response_frame, 2D numpy.ndarray
  243. """
  244. if self.p1.sig1 is None:
  245. self.calculate_signals()
  246. ctv_handler = get_ctv_handler(flags=self.flags, p1=self.p1)
  247. return ctv_handler.apply(self.p1.sig1)
  248. def generate_overview_for_current_measurement(self):
  249. """
  250. Generate an overview of the measurement data currently loaded based on current flags and returns it
  251. :return: overview_frame, data_limits, overview_generator_used
  252. overview_frame: 2D numpy.ndarray, the overview image in XY format with origin at bottom left
  253. data_limits: tuple, the lower and upper limits of data in overview image
  254. overview_generator_used: OverviewColorizerAnnotator object used to generate overview
  255. """
  256. if self.p1.sig1 is None:
  257. self.calculate_signals()
  258. return generate_overview_image(flags=self.flags, p1=self.p1)
  259. def generate_overview_for_output_for_current_measurement(self):
  260. """
  261. Generates overview frame and transforms it so that it can be readily used either for plt.imshow or for
  262. saving with tifffile.imsave
  263. :return: overview_frame, data_limits
  264. overview_frame: 2D numpy.ndarray, the overview image in YX format with origin at top right
  265. data_limits: tuple, the lower and upper limits of data in overview image
  266. """
  267. if self.p1.sig1 is None:
  268. self.calculate_signals()
  269. return generate_overview_image_for_output(flags=self.flags, p1=self.p1)
  270. def backup_files(self, files, target_directory):
  271. """
  272. Copies all files in <files> to the target directory, inserting "last_used" before suffix
  273. :param files: list of strings, each pointing to a file on the file system
  274. :param target_directory: str, pointing to a directory on the file system
  275. """
  276. for file in files:
  277. file_path = pl.Path(file)
  278. target_filename = f"{file_path.stem}_last_used{file_path.suffix}"
  279. target_file = pl.Path(target_directory) / target_filename
  280. logging.getLogger("VIEW").info(f"Backing up {file} to {target_file}")
  281. shutil.copy(file, str(target_file))
  282. def backup_script_flags_configs_for_GDMs(self, files):
  283. """
  284. Copies all files in <files> to the traces output folder of the animal currently initialized
  285. :param files: list of strings, each pointing to a file on the file system
  286. """
  287. target_directory = self.flags.get_animal_op_dir()
  288. self.backup_files(files + [self.log_file], target_directory)
  289. def backup_script_flags_configs_for_movies(self, files):
  290. """
  291. Copies all files in <files> to the movies output folder of the animal currently initialized
  292. :param files: list of strings, each pointing to a file on the file system
  293. """
  294. target_directory = self.flags.get_op_movie_dir()
  295. self.backup_files(files + [self.log_file], target_directory)
  296. def backup_script_flags_configs_for_tapestries(self, files):
  297. """
  298. Copies all files in <files> to the tapestries output folder for the animal <animal>
  299. :param files: list of strings, each pointing to a file on the file system
  300. """
  301. # view_obj = cls()
  302. # view_obj.update_flags_from_ymlfile(yml_file)
  303. # view_obj.initialize_animal(animal)
  304. #
  305. target_directory = self.flags.get_op_tapestries_dir()
  306. #
  307. # # logging does not write anything if the created object is not returned?? Manually writing version
  308. # with open(view_obj.log_file_obj.name, 'w') as fh:
  309. # fh.write(f"VIEW object initialized for offline use. Version: {get_distribution('view').version}")
  310. self.backup_files(files + [self.log_file], target_directory)
  311. def get_current_setup_extension(self):
  312. return get_setup_extension(self.flags["LE_loadExp"])
  313. def get_measu_label(self, measu):
  314. return self.flags.get_measurement_label(measurement_row=self.measurement_list.get_row_by_measu(measu))
  315. def backup_script_flags_configs_for_processed_data_output(self, files, format_name):
  316. """
  317. Copies all files in <files> to the output folder for processed data for format in `format_name`.
  318. :param files: list of strings, each pointing to a file on the file system
  319. """
  320. target_directory = self.flags.get_processed_data_op_path(format_name)
  321. self.backup_files(files + [self.log_file], target_directory)