mtreenode.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. from typing import (
  2. Any,
  3. Iterable,
  4. List,
  5. Optional,
  6. Tuple,
  7. Union,
  8. )
  9. from dataladmetadatamodel.mappableobject import (
  10. MappableObject,
  11. ensure_mapped,
  12. )
  13. from dataladmetadatamodel.metadatapath import MetadataPath
  14. from dataladmetadatamodel.mapper.reference import Reference
  15. class MTreeNode(MappableObject):
  16. def __init__(self,
  17. leaf_class: Any,
  18. realm: Optional[str] = None,
  19. reference: Optional[Reference] = None):
  20. assert isinstance(reference, (type(None), Reference))
  21. super().__init__(realm, reference)
  22. self.leaf_class = leaf_class
  23. self.leaf_class_name = leaf_class.__name__
  24. self.child_nodes = dict()
  25. def __contains__(self, path: MetadataPath) -> bool:
  26. return self.get_child(path) is not None
  27. def __str__(self):
  28. return f"<{type(self).__name__}: children: {self.child_nodes.keys()}>"
  29. def modifiable_sub_objects_impl(self) -> Iterable["MappableObject"]:
  30. yield from self.child_nodes.values()
  31. def purge_impl(self):
  32. for child_node in self.child_nodes.values():
  33. child_node.purge()
  34. self.child_nodes = dict()
  35. def deepcopy_impl(self,
  36. new_mapper_family: Optional[str] = None,
  37. new_destination: Optional[str] = None,
  38. **kwargs) -> "MappableObject":
  39. copied_mtree_node = MTreeNode(self.leaf_class)
  40. for child_name, child_node in self.child_nodes.items():
  41. copied_mtree_node.add_child(
  42. child_name,
  43. child_node.deepcopy(
  44. new_mapper_family,
  45. new_destination))
  46. copied_mtree_node.write_out(new_destination)
  47. copied_mtree_node.purge()
  48. return copied_mtree_node
  49. def contains_child(self,
  50. child_name: Union[str, MetadataPath]
  51. ) -> bool:
  52. return self.get_child(child_name) is not None
  53. def get_child(self,
  54. child_name: Union[str, MetadataPath]
  55. ) -> Optional[MappableObject]:
  56. self.ensure_mapped()
  57. return self.child_nodes.get(str(child_name), None)
  58. def add_child(self, node_name: str, child: MappableObject):
  59. """
  60. Add a sub-element with the given name. The sub-element
  61. can be an MTreeNode or any other MappableObject (usually
  62. it will be an MTreeNode, Metadata, or MetadataRootRecord.
  63. """
  64. self._add_children([(node_name, child)])
  65. def _add_children(self,
  66. children: List[Tuple[str, MappableObject]]):
  67. # Assert that no name is "." or contains a "/", and
  68. # assert that there are no duplicated names in the input
  69. assert all(map(lambda nn: nn[0] != ".", children))
  70. assert all(map(lambda nn: nn[0].find("/") < 0, children))
  71. assert len(set(map(lambda nn: nn[0], children))) == len(children)
  72. new_names = set(map(lambda nn: nn[0], children))
  73. self.ensure_mapped()
  74. duplicated_names = set(self.child_nodes.keys()) & new_names
  75. if duplicated_names:
  76. raise ValueError(
  77. "Child node with the following name(s) already exist(s): "
  78. + ", ".join(duplicated_names))
  79. self.touch()
  80. self.child_nodes = {
  81. **self.child_nodes,
  82. **dict(children)
  83. }
  84. def add_child_at(self,
  85. child: MappableObject,
  86. path: MetadataPath):
  87. assert len(path) > 0, "child name required"
  88. path_element = path.parts[0]
  89. existing_child = self.get_child(path_element)
  90. if len(path) == 1:
  91. if isinstance(existing_child, MTreeNode):
  92. raise ValueError(
  93. f"tree-node named {path_element} exists: "
  94. f"{existing_child}, cannot be converted "
  95. f"to {child}")
  96. # Add the final object to self
  97. self.add_child(path.parts[0], child)
  98. else:
  99. if existing_child is None:
  100. # Create an intermediate node, if it doesn't exist
  101. existing_child = MTreeNode(self.leaf_class)
  102. self.add_child(path_element, existing_child)
  103. if not isinstance(existing_child, MTreeNode):
  104. raise ValueError(
  105. f"non tree-node named {path_element} exists: "
  106. f"{existing_child}, cannot be converted to a "
  107. f"tree-node")
  108. existing_child.add_child_at(
  109. child,
  110. MetadataPath(
  111. "/".join(path.parts[1:])))
  112. def remove_child(self,
  113. child_name: str):
  114. self.ensure_mapped()
  115. self.touch()
  116. del self.child_nodes[child_name]
  117. def remove_child_at(self,
  118. path: MetadataPath):
  119. containing_path = MetadataPath("/".join(path.parts[:-1]))
  120. containing_node = self.get_object_at_path(containing_path)
  121. containing_node.remove_child(path.parts[-1])
  122. def get_object_at_path(self,
  123. path: Optional[MetadataPath] = None
  124. ) -> Optional["MTreeNode"]:
  125. # Linear search for path
  126. path = path or MetadataPath("")
  127. current_node = self
  128. for element in path.parts:
  129. if not isinstance(current_node, MTreeNode):
  130. return None
  131. current_node = current_node.get_child(element)
  132. if current_node is None:
  133. return None
  134. return current_node
  135. def get_paths_recursive(self,
  136. show_intermediate: Optional[bool] = False
  137. ) -> Iterable[Tuple[MetadataPath, "MappableObject"]]:
  138. if show_intermediate:
  139. yield MetadataPath(""), self
  140. with ensure_mapped(self):
  141. for child_name, child_node in self.child_nodes.items():
  142. if not isinstance(child_node, MTreeNode):
  143. yield MetadataPath(child_name), child_node
  144. else:
  145. for sub_path, tree_node in child_node.get_paths_recursive(
  146. show_intermediate):
  147. yield MetadataPath(child_name) / sub_path, tree_node
  148. @staticmethod
  149. def is_root_path(path: MetadataPath) -> bool:
  150. return len(path.parts) == 0