old_file_handler.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import pathlib as pl
  2. import datetime
  3. import pandas as pd
  4. import typing
  5. import logging
  6. from .measurement_list.io import get_ext_based_values
  7. import shutil
  8. def get_old_file_handler(fle: str):
  9. lst_path = pl.Path(fle)
  10. if lst_path.is_file():
  11. try:
  12. return OldFileHandlerMeasurementList(fle)
  13. except NotImplementedError as nie:
  14. return OldFileHandler(fle)
  15. else:
  16. return OldFileHandlerBlank()
  17. class OldFileHandlerBlank(object):
  18. def __init__(self):
  19. super().__init__()
  20. pass
  21. def backup(self):
  22. pass
  23. def write_old_values(self, df, columns, measu_col_name: str = "Measu", label_col_name: str = "Label"):
  24. return df
  25. class OldFileHandler(OldFileHandlerBlank):
  26. def __init__(self, fle: str):
  27. super().__init__()
  28. self.fle = fle
  29. def backup(self):
  30. lst_fle_path = pl.Path(self.fle)
  31. backup_path = \
  32. lst_fle_path.with_suffix(f".{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}{lst_fle_path.suffix}")
  33. if backup_path.is_file():
  34. backup_path.unlink()
  35. shutil.copy(src=self.fle, dst=str(backup_path))
  36. logging.getLogger("VIEW").info(f"Backing up file to {backup_path}")
  37. class OldFileHandlerMeasurementList(OldFileHandler):
  38. def __init__(self, lst_fle: str):
  39. super().__init__(lst_fle)
  40. self.io_class, _, _ = get_ext_based_values(lst_fle)
  41. self.old_lst_df = self.io_class.read(lst_fle)
  42. if "index" in self.old_lst_df.columns:
  43. del self.old_lst_df["index"]
  44. def write_old_values(self, df, columns: typing.Iterable[str], measu_col_name: str = "Measu",
  45. label_col_name: str = "Label"):
  46. """
  47. For every column in <columns>, the values in the column of old measurement file, if it exists, will be copied
  48. into df
  49. :param df: pandas.DataFrame
  50. :param columns: iterable of strings
  51. :param measu_col_name: string, name of the "Measu" column
  52. :param label_col_name: string, name of the "Label" column
  53. :return: pandas.DataFrame
  54. """
  55. columns = list(columns)
  56. if measu_col_name not in columns:
  57. columns.append(measu_col_name)
  58. # restrict columns to be the intersection of
  59. columns = set(columns).intersection(set(self.old_lst_df.columns.values))
  60. mask = self.old_lst_df[measu_col_name].apply(lambda x: x in df[measu_col_name].values)
  61. indices2use = mask.index.values[mask.values].tolist()
  62. old_df_subset = self.old_lst_df.reindex(index=indices2use, columns=columns).set_index(measu_col_name)
  63. df_temp = df.set_index(measu_col_name)
  64. combined_df = df_temp.combine(old_df_subset, func=lambda s1, s2: s2, overwrite=False)
  65. return combined_df.reset_index()
  66. def backup_if_existing(filename):
  67. old_file_handler = get_old_file_handler(filename)
  68. old_file_handler.backup()