Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

utils.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import time
  2. import unittest
  3. from typing import (
  4. cast,
  5. Any,
  6. List,
  7. Optional,
  8. Union,
  9. )
  10. from uuid import UUID
  11. from dataladmetadatamodel.datasettree import DatasetTree
  12. from dataladmetadatamodel.filetree import FileTree
  13. from dataladmetadatamodel.mappableobject import MappableObject
  14. from dataladmetadatamodel.metadata import Metadata
  15. from dataladmetadatamodel.metadatapath import MetadataPath
  16. from dataladmetadatamodel.metadatarootrecord import MetadataRootRecord
  17. from dataladmetadatamodel.mtreeproxy import MTreeProxy
  18. from dataladmetadatamodel.uuidset import UUIDSet
  19. from dataladmetadatamodel.versionlist import VersionList
  20. from dataladmetadatamodel.mapper.reference import Reference
  21. uuid_pattern = "9900{:04x}00000000000000000000{:04x}"
  22. version_pattern = "ea{:04x}000000000000000000000000000000{:04x}"
  23. default_file_tree_paths = [
  24. MetadataPath("a/b/c"),
  25. MetadataPath("a/b/a"),
  26. MetadataPath("b"),
  27. MetadataPath("c/d/e"),
  28. MetadataPath("a/x")]
  29. default_dataset_tree_paths = [
  30. MetadataPath("d1"),
  31. MetadataPath("d1/d1.1"),
  32. MetadataPath("d2"),
  33. MetadataPath("d2/d2.1/d2.1.1"),
  34. MetadataPath("d3/d3.1")]
  35. def get_uuid(n: int) -> UUID:
  36. return UUID(uuid_pattern.format(n, n))
  37. def get_location(n: int) -> str:
  38. return version_pattern.format(n, n)
  39. def get_version(n: int) -> str:
  40. return get_location(n)
  41. def assert_equal(test_case: unittest.TestCase,
  42. object_a: Any,
  43. object_b: Any,
  44. _: bool,
  45. ):
  46. test_case.assertEqual(object_a, object_b)
  47. def assert_metadata_equal(test_case: unittest.TestCase,
  48. object_a: Any,
  49. object_b: Any,
  50. _: bool,
  51. ):
  52. test_case.assertEqual(object_a, object_b)
  53. def assert_mrr_equal(test_case: unittest.TestCase,
  54. a_mrr: MetadataRootRecord,
  55. b_mrr: MetadataRootRecord,
  56. a_purge_unsafe: bool,
  57. ):
  58. test_case.assertEqual(a_mrr.dataset_identifier, b_mrr.dataset_identifier)
  59. test_case.assertEqual(a_mrr.dataset_version, b_mrr.dataset_version)
  60. assert_mappable_objects_equal(
  61. test_case,
  62. a_mrr.dataset_level_metadata,
  63. b_mrr.dataset_level_metadata,
  64. a_purge_unsafe,
  65. assert_metadata_equal)
  66. assert_mappable_objects_equal(
  67. test_case,
  68. a_mrr._file_tree,
  69. b_mrr._file_tree,
  70. a_purge_unsafe,
  71. assert_file_trees_equal)
  72. def assert_mappable_objects_equal(test_case: unittest.TestCase,
  73. a_object: Union[MappableObject, MTreeProxy],
  74. b_object: Union[MappableObject, MTreeProxy],
  75. a_purge_unsafe: bool,
  76. equality_asserter):
  77. a_object = a_object.read_in()
  78. b_object = b_object.read_in()
  79. equality_asserter(test_case, a_object, b_object, a_purge_unsafe)
  80. if a_purge_unsafe is False:
  81. a_object.purge()
  82. b_object.purge()
  83. def assert_file_trees_equal(test_case: unittest.TestCase,
  84. a: FileTree,
  85. b: FileTree,
  86. unsafe: bool
  87. ):
  88. a_entries = list(a.get_paths_recursive())
  89. b_entries = list(b.get_paths_recursive())
  90. # Compare paths
  91. test_case.assertListEqual(
  92. sorted(list(map(lambda x: x[0], a_entries))),
  93. sorted(list(map(lambda x: x[0], b_entries))))
  94. # Compare metadata elements
  95. for a_object, b_object in zip(
  96. map(lambda x: x[1], a_entries),
  97. map(lambda x: x[1], b_entries)):
  98. # TODO: proper metadata comparison instead of assertEqual
  99. assert_mappable_objects_equal(
  100. test_case,
  101. a_object,
  102. b_object,
  103. unsafe,
  104. assert_equal)
  105. def assert_dataset_trees_equal(test_case: unittest.TestCase,
  106. a: DatasetTree,
  107. b: DatasetTree,
  108. a_purge_unsafe: bool
  109. ):
  110. a_entries = list(a.dataset_paths)
  111. b_entries = list(b.dataset_paths)
  112. # Compare paths
  113. test_case.assertListEqual(
  114. list(map(lambda x: x[0], a_entries)),
  115. list(map(lambda x: x[0], b_entries)))
  116. # Compare metadata_root_records
  117. for a_mrr, b_mrr in zip(
  118. map(lambda x: x[1], a_entries),
  119. map(lambda x: x[1], b_entries)):
  120. assert_mrr_equal(
  121. test_case,
  122. a_mrr.read_in(),
  123. b_mrr.read_in(),
  124. a_purge_unsafe)
  125. def assert_mrrs_equal(test_case: unittest.TestCase,
  126. a: MetadataRootRecord,
  127. b: MetadataRootRecord,
  128. unsafe: bool
  129. ):
  130. # Compare dataset level metadata
  131. assert_mappable_objects_equal(
  132. test_case,
  133. a.dataset_level_metadata,
  134. b.dataset_level_metadata,
  135. unsafe,
  136. assert_equal)
  137. # Compare file trees
  138. assert_mappable_objects_equal(
  139. test_case,
  140. a._file_tree,
  141. b._file_tree,
  142. unsafe,
  143. assert_file_trees_equal
  144. )
  145. # Compare the remaining metadata
  146. test_case.assertEqual(a.dataset_identifier, b.dataset_identifier)
  147. test_case.assertEqual(a.dataset_version, b.dataset_version)
  148. def assert_version_lists_equal(test_case: unittest.TestCase,
  149. a: VersionList,
  150. b: VersionList,
  151. unsafe: bool
  152. ):
  153. a_entries = [a.get_versioned_element(pdv) for pdv in a.versions()]
  154. b_entries = [b.get_versioned_element(pdv) for pdv in b.versions()]
  155. for a_entry, b_entry in zip(a_entries, b_entries):
  156. test_case.assertEqual(a_entry[0], b_entry[0])
  157. test_case.assertEqual(a_entry[1], b_entry[1])
  158. test_case.assertIsInstance(a_entry[2], (DatasetTree, MetadataRootRecord))
  159. test_case.assertIsInstance(b_entry[2], (DatasetTree, MetadataRootRecord))
  160. if isinstance(a_entry[2], DatasetTree):
  161. assert_dataset_trees_equal(
  162. test_case,
  163. cast(DatasetTree, a_entry[2]),
  164. cast(DatasetTree, b_entry[2]),
  165. unsafe)
  166. else:
  167. assert_mrr_equal(
  168. test_case,
  169. cast(MetadataRootRecord, a_entry[2]),
  170. cast(MetadataRootRecord, b_entry[2]),
  171. unsafe)
  172. def assert_uuid_sets_equal(test_case: unittest.TestCase,
  173. a_uuid_set: UUIDSet,
  174. b_uuid_set: UUIDSet):
  175. for dataset_id in a_uuid_set.uuids():
  176. a_version_list = a_uuid_set.get_version_list(dataset_id)
  177. b_version_list = b_uuid_set.get_version_list(dataset_id)
  178. assert_version_lists_equal(test_case,
  179. a_version_list,
  180. b_version_list,
  181. True)
  182. def create_file_tree(paths: Optional[List[MetadataPath]] = None) -> FileTree:
  183. paths = paths or default_file_tree_paths
  184. file_tree = FileTree()
  185. for path in paths:
  186. metadata = Metadata()
  187. file_tree.add_metadata(path, metadata)
  188. return file_tree
  189. def create_file_tree_with_metadata(paths: List[MetadataPath],
  190. metadata: List[Metadata]) -> FileTree:
  191. assert len(paths) == len(metadata)
  192. file_tree = FileTree()
  193. for path, md in zip(paths, metadata):
  194. file_tree.add_metadata(path, md)
  195. return file_tree
  196. def create_dataset_tree(dataset_tree_paths: Optional[List[MetadataPath]] = None,
  197. file_tree_paths: Optional[List[MetadataPath]] = None,
  198. initial_mrr: Optional[MetadataRootRecord] = None
  199. ) -> DatasetTree:
  200. dataset_tree_paths = dataset_tree_paths or default_dataset_tree_paths
  201. file_tree_paths = file_tree_paths or default_file_tree_paths
  202. dataset_tree = DatasetTree()
  203. for index, path in enumerate(dataset_tree_paths):
  204. metadata = Metadata()
  205. file_tree = create_file_tree(file_tree_paths)
  206. if initial_mrr is None:
  207. mrr = MetadataRootRecord(
  208. get_uuid(index),
  209. get_version(index),
  210. metadata,
  211. file_tree)
  212. else:
  213. mrr = initial_mrr
  214. dataset_tree.add_dataset(path, mrr)
  215. return dataset_tree
  216. class MMDummy:
  217. def __init__(self,
  218. info: str = "",
  219. is_mapped: bool = False,
  220. copied_from: Optional["MMDummy"] = None):
  221. self.info = info or f"MMDummy created at {time.time()}"
  222. self.mapped = is_mapped
  223. self.copied_from = copied_from
  224. def deepcopy(self, *args, **kwargs):
  225. return MMDummy(self.info, False, self)
  226. def read_in(self, backend_type="git") -> Any:
  227. self.mapped = True
  228. return self
  229. def write_out(self,
  230. destination: Optional[str] = None,
  231. backend_type: str = "git",
  232. force_write: bool = False) -> Reference:
  233. return Reference(
  234. "MMDummy",
  235. get_location(0x51))
  236. def purge(self, force: bool = False):
  237. self.mapped = False
  238. return