123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- import pathlib as pl
- from tillvisionio.vws import VWSDataManager
- import pandas as pd
- import tifffile
- from view.python_core.misc import excel_datetime
- import typing
- import easygui
- import logging
- import pprint
- from abc import ABC, abstractmethod
- import xml.etree.ElementTree as ET
- import datetime
- def calculate_dt_from_timing_ms(timing_ms: str) -> float:
- times = timing_ms.strip()
- times = [float(x) for x in times.split(' ')]
- # calculate frame rate as time of (last frame - first frame) / (frames-1)
- dt = (times[-1] - times[0]) / (len(times) - 1)
- return dt
- # a function injecting code into the automatic parsing of metadata from vws.log file
- def additional_cols_func(s):
- # time & analyze
- try:
- dt = calculate_dt_from_timing_ms(s["Timing_ms"])
- analyze = 1 # since there are at least two frames, and thus a time, I suppose it is worth analyzing
- except Exception as e:
- dt = -1
- analyze = 0
- return {"dt": dt, "Analyze": analyze}
- class BaseImporter(ABC):
- def __init__(self, default_values: typing.Mapping):
- super().__init__()
- self.default_values = default_values
- self.associated_extensions = None
- self.associate_file_type = None
- self.LE_loadExp = None
- self.movie_data_extensions = None
- def get_default_row(self):
- return pd.Series(self.default_values)
- def import_metadata(self, raw_data_files, measurement_filter):
- combined_df = pd.DataFrame()
- for fle_ind, fle in enumerate(raw_data_files):
- logging.getLogger("VIEW").info(f"Parsing metadata from {fle}")
- df = self.parse_metadata(fle, fle_ind, measurement_filter)
- combined_df = combined_df.append(df, ignore_index=True)
- return combined_df
- def get_filetype_info_string(self):
- return [f"*{x}" for x in self.associated_extensions] + [self.associate_file_type]
- def ask_for_files(self, default_dir, multiple: bool = True) -> dict:
- default_dir_str = str(pl.Path(default_dir) / "*")
- files_chosen = easygui.fileopenbox(
- title=f"Choose one or more files for LE_loadExp={self.LE_loadExp}",
- filetypes=self.get_filetype_info_string(),
- multiple=multiple,
- default=default_dir_str)
- if files_chosen is None:
- raise IOError("User Abort while choosing files.")
- else:
- assert files_chosen[0].startswith(str(default_dir)), \
- f"The data selected in not in the expected data directory of the current tree:\n" \
- f"{default_dir}. Please copy your data there and try again!"
- animal_tag_raw_data_mapping = self.get_animal_tag_raw_data_mapping(files_chosen)
- logging.getLogger("VIEW").info(
- f"Working on the following animal tags and their corresponding files:\n"
- f"{pprint.pformat(animal_tag_raw_data_mapping)}")
- return animal_tag_raw_data_mapping
- @abstractmethod
- def parse_metadata(self, fle: str, fle_ind: int,
- measurement_filter: typing.Callable[[pd.Series], bool]) -> pd.DataFrame:
- """
- Reads and returns the metadata from a metadata file
- :param str fle: path of a metadata file
- :param int fle_ind: integer representing the row order of the measurement associated with <fle>,
- if it is part of a series
- :param Callable measurement_filter: only used for Till Vision setups. See tillvisionio.VWSDataManager.get_all_metadata
- :rtype: pd.DataFrame
- :return: the columns of the DataFrame returned must be a subset of the metadata columns defined in `view/flags_and_metadata_definitions/metadata_definition.csv`
- """
- pass
- @abstractmethod
- def get_animal_tag_raw_data_mapping(self, files_chosen: list) -> dict:
- """
- Parses the animal tag from raw data file names (<file_chosen>). Revises the raw data file names if necessary.
- Returns a one-element dictionary with the animal tag as key and list of (revised) raw data files as value.
- :param list files_chosen: list of raw data file names
- :rtype: dict
- """
- pass
- @abstractmethod
- def get_path_relative_to_data_dir(self, fle):
- """
- Creates a string representing the path of the raw data file <fle> relative to the data directory represented
- by the flag "STG_Datapath" (Eg.: "01_DATA")
- :param fle: path of the raw data file as parsed from the metadata file
- :rtype: str
- """
- pass
- class TillImporter(BaseImporter, ABC):
- def __init__(self, default_values: typing.Mapping):
- super().__init__(default_values)
- self.associate_file_type = "VWS Log Files"
- self.associated_extensions = [".vws.log"]
- self.movie_data_extensions = [".pst", ".ps"]
- def get_animal_tag_raw_data_mapping(self, files_chosen: list) -> dict:
- if len(files_chosen) == 0:
- return {}
- else:
- dict2return = {}
- for fle in files_chosen:
- fle_path = pl.Path(fle)
- dict2return[fle_path.name.split(".")[0]] = [fle]
- return dict2return
- def get_path_relative_to_data_dir(self, fle):
- for extension in self.movie_data_extensions:
- if fle.endswith(extension):
- fle_path = pl.PureWindowsPath(fle)
- possible_dbb1 = str(pl.Path(fle_path.parts[-2]) / fle_path.stem)
- return 1, str(possible_dbb1)
- else:
- return 0, "wrong extension"
- def convert_vws_names_to_lst_names(self, vws_measurement_series, default_row):
- """
- Convert values from vws.log nomenclaure to internal measurement list nomenclature
- :param vws_measurement_series: pandas.Series
- :param default_row: pandas.Series with default values
- :return: pandas.series
- """
- logging.getLogger("VIEW").info(f'Parsing measurement with label {vws_measurement_series["Label"]}')
- lst_line = default_row.copy()
- lst_line['Measu'] = vws_measurement_series['index'] + 1
- lst_line['Label'] = vws_measurement_series['Label']
- expected_data_file = vws_measurement_series["Location"]
- if expected_data_file[-2:] == 'ps':
- # there is one version of the macro in tillVision that "eats" the last t of the file name
- logging.getLogger("VIEW").warning('adding a t to the .ps file name to make it .pst')
- expected_data_file += 't'
- analyze, dbb1_relative = self.get_path_relative_to_data_dir(expected_data_file)
- if analyze == 0:
- logging.getLogger("VIEW").warning(
- f"Data file {expected_data_file} not found! Setting analyze=0 for this measurement")
- lst_line['DBB1'] = dbb1_relative
- lst_line["Analyze"] = analyze * int(lst_line.get("Analyze", 1))
- lst_line['Cycle'] = vws_measurement_series["dt"]
- lst_line['Lambda'] = vws_measurement_series['MonochromatorWL_nm']
- lst_line['UTC'] = vws_measurement_series['UTCTime']
- return pd.DataFrame(lst_line).T
- def get_mtime(self, utc, first_utc):
- time_since_first_utc = pd.to_timedelta(utc - first_utc, unit="s")
- return str(time_since_first_utc).split(" days ")[1]
- class TillImporterOneWavelength(TillImporter):
- def __init__(self, default_values: typing.Mapping):
- super().__init__(default_values)
- self.LE_loadExp = 3
- # for till data, metadata is contained in vws.log file
- def parse_metadata(self, fle: str, fle_ind: int,
- measurement_filter: typing.Callable[[pd.Series], bool]) -> pd.DataFrame:
- vws_manager = VWSDataManager(fle)
- measurements = vws_manager.get_all_metadata(filter=measurement_filter,
- additional_cols_func=additional_cols_func)
- first_utc = vws_manager.get_earliest_utc()
- this_lst_frame = pd.DataFrame()
- if len(measurements) == 0:
- logging.getLogger("VIEW").warning(
- f"In {fle}: No usable measurements found for given 'measurement_filter' function")
- for measurement_index, measurement_row in measurements.iterrows():
- lst_line = self.convert_vws_names_to_lst_names(vws_measurement_series=measurement_row,
- default_row=self.get_default_row(),
- )
- lst_line["MTime"] = self.get_mtime(utc=lst_line["UTC"][0], first_utc=first_utc)
- this_lst_frame = this_lst_frame.append(lst_line, ignore_index=True)
- return this_lst_frame
- class TillImporterTwoWavelength(TillImporter):
- def __init__(self, default_values: typing.Mapping):
- super().__init__(default_values)
- self.LE_loadExp = 4
- def parse_metadata(self, fle: str, fle_ind: int,
- measurement_filter: typing.Callable[[pd.Series], bool]) -> pd.DataFrame:
- vws_manager = VWSDataManager(fle)
- measurements_wl340_df, measurements_wl380_df \
- = vws_manager.get_metadata_two_wavelengths(wavelengths=(340, 380), filter=measurement_filter,
- additional_cols_func=additional_cols_func)
- first_utc = vws_manager.get_earliest_utc()
- this_lst_frame = pd.DataFrame()
- for (ind1, measurement_wl340), (ind2, measurement_wl380) in zip(measurements_wl340_df.iterrows(),
- measurements_wl380_df.iterrows()):
- lst_line_wl340 = self.convert_vws_names_to_lst_names(measurement_wl340, self.get_default_row())
- lst_line_wl380 = self.convert_vws_names_to_lst_names(measurement_wl380, self.get_default_row())
- lst_line_wl340["dbb2"] = lst_line_wl380["DBB1"]
- lst_line_wl340["MTime"] = self.get_mtime(utc=lst_line_wl340["UTC"][0], first_utc=first_utc)
- lst_line_wl380["Analyze"] = 0
- lst_line_wl380["MTime"] = self.get_mtime(utc=lst_line_wl380["UTC"][0], first_utc=first_utc)
- this_lst_frame = this_lst_frame.append(lst_line_wl340, ignore_index=True)
- this_lst_frame = this_lst_frame.append(lst_line_wl380, ignore_index=True)
- return this_lst_frame
- class LSMImporter(BaseImporter):
- def __init__(self, default_values: typing.Mapping):
- super().__init__(default_values)
- self.associate_file_type = "Zeiss LSM files" # short text describing raw data files
- self.associated_extensions = [".lsm"] # possible extensions of files containing metadata
- self.movie_data_extensions = [".lsm"] # possible extension of file containing data (calcium imaging movies)
- self.LE_loadExp = 20 # associated value of the flag LE_loadExp
- def get_path_relative_to_data_dir(self, fle):
- for movie_data_extension in self.movie_data_extensions:
- if fle.endswith(movie_data_extension):
- fle_path = pl.PureWindowsPath(fle)
- return 1, str(pl.Path(fle_path.parts[-3]) / fle_path.parts[-2] / fle_path.stem)
- else:
- return 0, -1
- def convert_lsm_metadata_to_lst_row(self, measu, fle, lsm_metadata, default_row):
- """
- Convert values from lsm_metadata to .lst nomenclature
- :param lsm_metadata: dict, like the one returned by tifffile.TiffFile.lsm_metadata
- :param default_row: pandas.Series, with default values
- :return: pandas.Series
- """
- lst_line = default_row.copy()
- lst_line["Label"] = lsm_metadata["ScanInformation"]["Name"]
- # converting from seconds to milliseconds
- lst_line["Cycle"] = lsm_metadata["TimeIntervall"] * 1000
- lst_line["Lambda"] = lsm_metadata["ScanInformation"]["Tracks"][0]["IlluminationChannels"][0]["Wavelength"]
- lst_line['UTC'] = excel_datetime(lsm_metadata["ScanInformation"]["Sample0time"]).timestamp()
- # convert from meters to micrometers
- lst_line["PxSzX"] = lsm_metadata["VoxelSizeX"] / 1e-6
- lst_line["PxSzY"] = lsm_metadata["VoxelSizeY"] / 1e-6
- analyze, dbb1_relative = self.get_path_relative_to_data_dir(fle)
- lst_line["DBB1"] = dbb1_relative
- lst_line["Analyze"] = analyze
- lst_line["Measu"] = measu
- return pd.DataFrame(lst_line).T
- # for till data, a single raw data file is a .lsm file
- def parse_metadata(self, fle: str, fle_ind: int,
- measurement_filter: typing.Callable[[pd.Series], bool] = True) -> pd.DataFrame:
- lsm_metadata = tifffile.TiffFile(fle).lsm_metadata
- lst_row = self.convert_lsm_metadata_to_lst_row(measu=fle_ind + 1,
- fle=fle,
- lsm_metadata=lsm_metadata,
- default_row=self.get_default_row())
- return lst_row
- class P1DualWavelengthTIFSingleFileImporter(BaseImporter):
- #added Dec2021, to import single tiff file with dual wavelength as used in Trondheim
- # init in view uses read_single_file_fura_tif(filename)
- def __init__(self, default_values: typing.Mapping):
- super().__init__(default_values)
- self.associate_file_type = "Dual Wavelength Tif files" # short text describing raw data files
- self.associated_extensions = [".tif"] # possible extensions of files containing metadata
- self.movie_data_extensions = [".tif"] # possible extension of file containing data (calcium imaging movies)
- self.LE_loadExp = 35 # associated value of the flag LE_loadExp
- def get_path_relative_to_data_dir(self, fle):
- for movie_data_extension in self.movie_data_extensions:
- if fle.endswith(movie_data_extension):
- fle_path = pl.PureWindowsPath(fle)
- return 1, str(pl.Path(fle_path.parts[-3]) / fle_path.parts[-2] / fle_path.stem)
- else:
- return 0, -1
- def convert_metadata_to_lst_row(self, measu, fle, meta_info, default_row):
- """
- Convert values from meta_info to .lst nomenclature
- :param meta_info['PsSzX']: dict, like the one returned by tifffile.TiffFile.lsm_metadata
- :param default_row: pandas.Series, with default values
- :return: pandas.Series
- """
- lst_line = default_row.copy()
- lst_line["Label"] = meta_info['Label']
- # converting from seconds to milliseconds
- lst_line["Cycle"] = meta_info['GDMfreq']
- lst_line["Lambda"] = "340/380" # meta_info['PsSzX']
- lst_line['UTC'] = meta_info['UTCTime']
- # convert from meters to micrometers
- lst_line["PxSzX"] = meta_info['PsSzX']
- lst_line["PxSzY"] = meta_info['PsSzY']
- analyze, dbb1_relative = self.get_path_relative_to_data_dir(fle)
- lst_line["DBB1"] = meta_info['dbb']
- lst_line["dbb2"] = meta_info['dbb']
- lst_line["Analyze"] = analyze
- lst_line["Measu"] = measu
- #additional info
- lst_line["ExposureTime_ms"] = meta_info['ExposureTime_ms']
- lst_line["AcquisitionDate"] = meta_info['AcquisitionDate']
- lst_line["Binning"] = meta_info['Binning']
- lst_line["StartTime"] = meta_info['StartTime']
- return pd.DataFrame(lst_line).T
- # for till data, a single raw data file
- def parse_metadata(self, fle: str, fle_ind: int,
- measurement_filter: typing.Callable[[pd.Series], bool] = True) -> pd.DataFrame:
- # load metadata
- tif_file=pl.Path(fle)
- with tifffile.TiffFile(tif_file) as tif:
- metadata = tif.imagej_metadata
- # imagej_metadata does not work any more or never worked on stack - read metadata from first frame
- if metadata is None:
- metadata = tif.pages[0].description
-
- # extract XML tree from metadata into root
- root = ET.fromstring(metadata)
- # define namespace for OME data
- # this uses xTree OME syntax
- # https://docs.python.org/3/library/xml.etree.elementtree.html#xml.etree.ElementTree.Element
- ns = {
- "d": "http://www.openmicroscopy.org/Schemas/OME/2013-06"
- }
- # now get all infos that we put into settings file
- meta_info = root.find("./d:Image/d:Pixels", ns).attrib
- # result is a dictionary, for example:
- # {'ID': 'Pixels:1-0',
- # 'DimensionOrder': 'XYTZC',
- # 'Type': 'uint16',
- # 'SizeX': '1392',
- # 'SizeY': '1040',
- # 'SizeZ': '1',
- # 'SizeC': '1',
- # 'SizeT': '160',
- # 'PhysicalSizeX': '6.45',
- # 'PhysicalSizeY': '6.45',
- # 'PhysicalSizeZ': '1000',
- # 'SignificantBits': '14'}
- # acquisition date as string, e.g. '2021-09-19T16:49:28'
- AcquisitionDate = root.find("./d:Image/d:AcquisitionDate", ns).text
- meta_info.update({'AcquisitionDate':AcquisitionDate})
- # binning info, e.g. '1x1'
- Binning = root.find("./d:Image/d:Pixels/d:Channel/d:DetectorSettings", ns).attrib["Binning"]
- meta_info.update({'Binning':Binning})
- # frame interval. Since this is dual wavelength, take distance of frame3 and frame5
- # relative time of secoond image (first image looks unsafe - often it is blanck. Therefore use frames 2 and 3)
- time_frame1 = root.find("./d:Image/d:Pixels/d:Plane[3]", ns).attrib["DeltaT"]
- # relative time of third image
- time_frame2 = root.find("./d:Image/d:Pixels/d:Plane[5]", ns).attrib["DeltaT"]
- GDMfreq = (float(time_frame2) - float(time_frame1))
- GDMfreq = round(GDMfreq*1000) # unit is ms, rounded
- meta_info.update({'GDMfreq':str(GDMfreq)})
- # this format is for two-wavelength recording,
- # so I take exposure time for frame 3 and 4
- # just in case the very first one would be strange
- ExposureTime_ms = float(root.find("./d:Image/d:Pixels/d:Plane[3]", ns).attrib["ExposureTime"])
- ExposureTime_ms_340 = int(1000*ExposureTime_ms) # value in Andor is in seconds
- ExposureTime_ms = float(root.find("./d:Image/d:Pixels/d:Plane[4]", ns).attrib["ExposureTime"])
- ExposureTime_ms_380 = int(1000*ExposureTime_ms) # value in Andor is in seconds
- ExposureTimeStr = str(ExposureTime_ms_340)+'/'+str(ExposureTime_ms_380)
- meta_info.update({'ExposureTime_ms':ExposureTimeStr})
- # columns in .settings that need to be filled here:
- # get the tif file, including the last directory
- this_filename = tif_file.parts
- dbb = this_filename[-2] +'/'+ this_filename[-1]
- meta_info.update({'dbb':dbb})
- meta_info.update({'Label':this_filename[-1]})
- # PxSzX
- # replace the Andor name "PhysicalSizeX' with the Galizia name PsSzX
- meta_info['PsSzX'] = meta_info.pop('PhysicalSizeX')
- meta_info['PsSzY'] = meta_info.pop('PhysicalSizeY')
- # PxSzY, e.g. 1.5625
- # When was this measurement taken?
- # first get the time when the measurement was started
- measurementtime = datetime.datetime.fromisoformat(AcquisitionDate)
- # now add the time of the first frame, since measurement start time ie equal for all measurements in one loop
- measurementtime_delta = datetime.timedelta(seconds=float(time_frame1))
- measurementtime = measurementtime + measurementtime_delta
- # StartTime, e.g. 10:05:04
- StartTime = measurementtime.strftime('%H:%M:%S')
- meta_info.update({'StartTime':StartTime})
- # UTC, e.g. 1623229504.482
- UTC = measurementtime.timestamp()
- meta_info.update({'UTCTime':UTC})
-
- ##example for meta_info now:
- # {'ID': 'Pixels:1-0',
- # 'DimensionOrder': 'XYCTZ',
- # 'Type': 'uint16',
- # 'SizeX': '336',
- # 'SizeY': '256',
- # 'SizeZ': '1',
- # 'SizeC': '2',
- # 'SizeT': '100',
- # 'PhysicalSizeZ': '1000',
- # 'SignificantBits': '16',
- # 'AcquisitionDate': '2019-08-14T14:44:29',
- # 'Binning': '4x4',
- # 'GDMfreq': '34',
- # 'ExposureTime_ms': '13',
- # 'dbb': '190815_h2_El/A_3.tif',
- # 'Label': 'A_3.tif',
- # 'PsSzX': '1.3',
- # 'PsSzY': '1.3',
- # 'StartTime': '14:44:29',
- # 'UTCTime': 1565786669.06601}
- lst_row = self.convert_metadata_to_lst_row(measu=fle_ind + 1,
- fle=fle,
- meta_info=meta_info,
- default_row=self.get_default_row())
- return lst_row
- def get_animal_tag_raw_data_mapping(self, files_chosen: list) -> dict:
- if len(files_chosen) == 0:
- return {}
- else:
- parents = [pl.Path(fle).parent for fle in files_chosen]
- assert all(x == parents[0] for x in parents), f"Tif files specified for constructing measurement " \
- f"list file do no belong to the same directory: " \
- f"{files_chosen}"
- return {parents[0].parent.name: files_chosen}
- def get_importer_class(LE_loadExp):
- if LE_loadExp == 3:
- return TillImporterOneWavelength
- elif LE_loadExp == 4:
- return TillImporterTwoWavelength
- elif LE_loadExp == 20:
- return LSMImporter
-
- elif LE_loadExp == 35:
- return P1DualWavelengthTIFSingleFileImporter
- else:
- raise NotImplementedError
- def get_setup_extension(LE_loadExp):
- """
- returns the file extension of raw data file of the setup specified by <LE_loadExp>
- :param int LE_loadExp: value of the flag of the same name
- :rtype: list
- """
- importer_class = get_importer_class(LE_loadExp)
- return importer_class({}).movie_data_extension
|