123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633 |
- # -*- coding: utf-8
- """
- Collects common base functionality
- """
- import collections
- import posixpath
- from . import terminology
- from .tools.doc_inherit import allow_inherit_docstring
- class BaseObject(object):
- _format = None
- def __hash__(self):
- """
- Allow all odML objects to be hash-able.
- """
- return id(self)
- def __eq__(self, obj):
- """
- Do a deep comparison of this object and its odml properties.
- The 'id' attribute of an object is excluded, since it is
- unique within a document.
- """
- # cannot compare totally different stuff
- if not isinstance(self, obj.__class__):
- return False
- for key in self._format:
- if key == "id" or key == "oid":
- continue
- elif getattr(self, key) != getattr(obj, key):
- return False
- return True
- def __ne__(self, obj):
- """
- Use the __eq__ function to determine if both objects are equal
- """
- return not self == obj
- def format(self):
- return self._format
- @property
- def document(self):
- """ Returns the Document object in which this object is contained """
- if self.parent is None:
- return None
- return self.parent.document
- def get_terminology_equivalent(self):
- """
- Returns the equivalent object in an terminology (should there be one
- defined) or None
- """
- return None
- def clean(self):
- """
- Stub that doesn't do anything for this class
- """
- pass
- def clone(self, children=True):
- """
- Clone this object recursively (if children is True) allowing to copy it
- independently to another document. If children is False, this acts as
- a template cloner, creating a copy of the object without any children
- """
- # TODO don't we need some recursion / deepcopy here?
- import copy
- obj = copy.copy(self)
- return obj
- class SmartList(list):
- def __init__(self, content_type):
- """
- Only values of the instance *content_type* can be added to the SmartList.
- """
- self._content_type = content_type
- super(SmartList, self).__init__()
- def __getitem__(self, key):
- """
- Provides element index also by searching for an element with a given name.
- """
- # Try normal list index first (for integers)
- if isinstance(key, int):
- return super(SmartList, self).__getitem__(key)
- # Otherwise search the list
- for obj in self:
- if (hasattr(obj, "name") and obj.name == key) or key == obj:
- return obj
- # and fail eventually
- raise KeyError(key)
- def __setitem__(self, key, value):
- """
- Replaces item at list[*key*] with *value*.
- :param key: index position
- :param value: object that replaces item at *key* position.
- value has to be of the same content type as the list.
- In this context usually a Section or a Property.
- """
- if not isinstance(value, self._content_type):
- raise ValueError("List only supports elements of type '%s'" %
- self._content_type)
- # If required remove new object from its old parents child-list
- if hasattr(value, "_parent") and (value._parent and value in value._parent):
- value._parent.remove(value)
- # If required move parent reference from replaced to new object
- # and set parent reference on replaced object None.
- if hasattr(self[key], "_parent"):
- value._parent = self[key]._parent
- self[key]._parent = None
- super(SmartList, self).__setitem__(key, value)
- def __contains__(self, key):
- for obj in self:
- if (hasattr(obj, "name") and obj.name == key) or key == obj:
- return True
- def __eq__(self, obj):
- """
- SmartList attributes of 'sections' and 'properties' are
- handled specially: We want to make sure that the lists'
- objects are properly compared without changing the order
- of the individual lists.
- """
- # This special case was introduced only due to the fact
- # that RDF files will be loaded with randomized list
- # order. With any other file format the list order
- # remains unchanged.
- if sorted(self, key=lambda x: x.name) != sorted(obj, key=lambda x: x.name):
- return False
- return True
- def __ne__(self, obj):
- """
- Use the __eq__ function to determine if both objects are equal
- """
- return not self == obj
- def index(self, obj):
- """
- Find obj in list
- """
- for i, e in enumerate(self):
- if e is obj:
- return i
- raise ValueError("remove: %s not in list" % repr(obj))
- def remove(self, obj):
- """
- Remove an element from this list.
- """
- del self[self.index(obj)]
- def append(self, *obj_tuple):
- for obj in obj_tuple:
- if obj.name in self:
- raise KeyError(
- "Object with the same name already exists! " + str(obj))
- if not isinstance(obj, self._content_type):
- raise ValueError("List only supports elements of type '%s'" %
- self._content_type)
- super(SmartList, self).append(obj)
- def sort(self, key=lambda x: x.name, reverse=False):
- """
- If not otherwise defined, sort by the *name* attribute
- of the lists *_content_type* object.
- """
- super(SmartList, self).sort(key=key, reverse=reverse)
- @allow_inherit_docstring
- class Sectionable(BaseObject):
- def __init__(self):
- from odml.section import BaseSection
- self._sections = SmartList(BaseSection)
- self._repository = None
- def __getitem__(self, key):
- return self._sections[key]
- def __len__(self):
- return len(self._sections)
- def __iter__(self):
- return self._sections.__iter__()
- @property
- def document(self):
- """
- Returns the parent-most node (if its a document instance) or None
- """
- from odml.doc import BaseDocument
- p = self
- while p.parent:
- p = p.parent
- if isinstance(p, BaseDocument):
- return p
- @property
- def sections(self):
- """ The list of sections contained in this section/document """
- return self._sections
- def insert(self, position, section):
- """
- Insert a Section at the child-list position. A ValueError will be raised,
- if a Section with the same name already exists in the child-list.
- :param position: index at which the object should be inserted.
- :param section: odML Section object.
- """
- from odml.section import BaseSection
- if isinstance(section, BaseSection):
- if section.name in self._sections:
- raise ValueError("Section with name '%s' already exists." % section.name)
- self._sections.insert(position, section)
- section._parent = self
- else:
- raise ValueError("Can only insert objects of type Section.")
- def append(self, section):
- """
- Method appends a single Section to the section child-lists of the current Object.
- :param section: odML Section object.
- """
- from odml.section import BaseSection
- if isinstance(section, BaseSection):
- self._sections.append(section)
- section._parent = self
- elif isinstance(section, collections.Iterable) and not isinstance(section, str):
- raise ValueError("Use extend to add a list of Sections.")
- else:
- raise ValueError("Can only append objects of type Section.")
- def extend(self, sec_list):
- """
- Method adds Sections to the section child-list of the current object.
- :param sec_list: Iterable containing odML Section entries.
- """
- from odml.section import BaseSection
- if not isinstance(sec_list, collections.Iterable):
- raise TypeError("'%s' object is not iterable" % type(sec_list).__name__)
- # Make sure only Sections with unique names will be added.
- for sec in sec_list:
- if not isinstance(sec, BaseSection):
- raise ValueError("Can only extend objects of type Section.")
- elif isinstance(sec, BaseSection) and sec.name in self._sections:
- raise KeyError("Section with name '%s' already exists." % sec.name)
- for sec in sec_list:
- self.append(sec)
- def remove(self, section):
- """ Removes the specified child-section """
- self._sections.remove(section)
- section._parent = None
- def itersections(self, recursive=True, yield_self=False,
- filter_func=lambda x: True, max_depth=None):
- """
- Iterate each child section
- Example: return all subsections which name contains "foo"
- >>> filter_func = lambda x: getattr(x, 'name').find("foo") > -1
- >>> sec_or_doc.itersections(filter_func=filter_func)
- :param recursive: iterate all child sections recursively (deprecated)
- :type recursive: bool
- :param yield_self: includes itself in the iteration
- :type yield_self: bool
- :param filter_func: accepts a function that will be applied to each
- iterable. Yields iterable if function returns True
- :type filter_func: function
- """
- stack = []
- # Below: never yield self if self is a Document
- if self == self.document and ((max_depth is None) or (max_depth > 0)):
- for sec in self.sections:
- stack.append((sec, 1)) # (<section>, <level in a tree>)
- elif self != self.document:
- stack.append((self, 0)) # (<section>, <level in a tree>)
- while len(stack) > 0:
- (sec, level) = stack.pop(0)
- if filter_func(sec) and (yield_self if level == 0 else True):
- yield sec
- if max_depth is None or level < max_depth:
- for sec in sec.sections:
- stack.append((sec, level + 1))
- def iterproperties(self, max_depth=None, filter_func=lambda x: True):
- """
- Iterate each related property (recursively)
- Example: return all children properties which name contains "foo"
- >>> filter_func = lambda x: getattr(x, 'name').find("foo") > -1
- >>> sec_or_doc.iterproperties(filter_func=filter_func)
- :param max_depth: iterate all properties recursively if None, only to
- a certain level otherwise
- :type max_depth: bool
- :param filter_func: accepts a function that will be applied to each
- iterable. Yields iterable if function returns True
- :type filter_func: function
- """
- for sec in [s for s in self.itersections(max_depth=max_depth,
- yield_self=True)]:
- if hasattr(sec, "properties"): # not to fail if odml.Document
- for i in sec.properties:
- if filter_func(i):
- yield i
- def itervalues(self, max_depth=None, filter_func=lambda x: True):
- """
- Iterate each related value (recursively)
- # Example: return all children values which string converted version
- has "foo"
- >>> filter_func = lambda x: str(x).find("foo") > -1
- >>> sec_or_doc.itervalues(filter_func=filter_func)
- :param max_depth: iterate all properties recursively if None, only to
- a certain level otherwise
- :type max_depth: bool
- :param filter_func: accepts a function that will be applied to each
- iterable. Yields iterable if function returns True
- :type filter_func: function
- """
- for prop in [p for p in self.iterproperties(max_depth=max_depth)]:
- if filter_func(prop.values):
- yield prop.values
- def contains(self, obj):
- """
- Checks if a subsection of name&type of *obj* is a child of this section
- if so return this child
- """
- for i in self._sections:
- if obj.name == i.name and obj.type == i.type:
- return i
- def _matches(self, obj, key=None, otype=None, include_subtype=False):
- """
- Find out
- * if the *key* matches obj.name (if key is not None)
- * or if *otype* matches obj.type (if type is not None)
- * if type does not match exactly, test for subtype.
- (e.g.stimulus/white_noise)
- comparisons are case-insensitive, however both key and type
- MUST be lower-case.
- """
- name_match = (key is None or (
- key is not None and hasattr(obj, "name") and obj.name == key))
- exact_type_match = (otype is None or (otype is not None and
- hasattr(obj, "type") and
- obj.type.lower() == otype))
- if not include_subtype:
- return name_match and exact_type_match
- subtype_match = (otype is None or
- (otype is not None and hasattr(obj, "type") and
- otype in obj.type.lower().split('/')[:-1]))
- return name_match and (exact_type_match or subtype_match)
- def get_section_by_path(self, path):
- """
- Find a Section through a path like "../name1/name2"
- :param path: path like "../name1/name2"
- :type path: str
- """
- return self._get_section_by_path(path)
- def get_property_by_path(self, path):
- """
- Find a Property through a path like "../name1/name2:property_name"
- :param path: path like "../name1/name2:property_name"
- :type path: str
- """
- laststep = path.split(":") # assuming section names do not contain :
- found = self._get_section_by_path(laststep[0])
- return self._match_iterable(found.properties, ":".join(laststep[1:]))
- def _match_iterable(self, iterable, key):
- """
- Searches for a key match within a given iterable.
- Raises ValueError if not found.
- """
- for obj in iterable:
- if self._matches(obj, key):
- return obj
- raise ValueError("Object named '%s' does not exist" % key)
- def _get_section_by_path(self, path):
- """
- Returns a Section by a given path.
- Raises ValueError if not found.
- """
- if path.startswith("/"):
- if len(path) == 1:
- raise ValueError("Not a valid path")
- doc = self.document
- if doc is not None:
- return doc._get_section_by_path(path[1:])
- raise ValueError(
- "A section with no Document cannot resolve absolute path")
- pathlist = path.split("/")
- if len(pathlist) > 1:
- if pathlist[0] == "..":
- found = self.parent
- elif pathlist[0] == ".":
- found = self
- else:
- found = self._match_iterable(self.sections, pathlist[0])
- if found:
- return found._get_section_by_path("/".join(pathlist[1:]))
- raise ValueError("Section named '%s' does not exist" % pathlist[0])
- else:
- return self._match_iterable(self.sections, pathlist[0])
- def find(self, key=None, type=None, findAll=False, include_subtype=False):
- """ Return the first subsection named *key* of type *type* """
- ret = []
- if type:
- type = type.lower()
- for s in self._sections:
- if self._matches(s, key, type, include_subtype=include_subtype):
- if findAll:
- ret.append(s)
- else:
- return s
- if ret:
- return ret
- def find_related(self, key=None, type=None, children=True, siblings=True,
- parents=True, recursive=True, findAll=False):
- """
- Finds a related section named *key* and/or *type*
- * by searching its children’s children if *children* is True
- if *recursive* is true all leave nodes will be searched
- * by searching its siblings if *siblings* is True
- * by searching the parent element if *parents* is True
- if *recursive* is True all parent nodes until the root are searched
- * if *findAll* is True, returns a list of all matching objects
- """
- ret = []
- if type:
- type = type.lower()
- if children:
- for section in self._sections:
- if self._matches(section, key, type):
- if findAll:
- ret.append(section)
- else:
- return section
- if recursive:
- obj = section.find_related(key, type, children,
- siblings=False, parents=False,
- recursive=recursive,
- findAll=findAll)
- if obj is not None:
- if findAll:
- ret += obj
- else:
- return obj
- if siblings and self.parent is not None:
- obj = self.parent.find(key, type, findAll)
- if obj is not None:
- if findAll:
- ret += obj
- else:
- return obj
- if parents:
- obj = self
- while obj.parent is not None:
- obj = obj.parent
- if self._matches(obj, key, type):
- if findAll:
- ret.append(obj)
- else:
- return obj
- if not recursive:
- break
- if ret:
- return ret
- return None
- def get_path(self):
- """
- Returns the absolute path of this section
- """
- node = self
- path = []
- while node.parent is not None:
- path.insert(0, node.name)
- node = node.parent
- return "/" + "/".join(path)
- @staticmethod
- def _get_relative_path(a, b):
- """
- Returns a relative path for navigation from dir *a* to dir *b*
- If the common parent of both is "/", return an absolute path
- """
- a += "/"
- b += "/"
- parent = posixpath.dirname(posixpath.commonprefix([a, b]))
- if parent == "/":
- return b[:-1]
- a = posixpath.relpath(a, parent)
- b = posixpath.relpath(b, parent)
- if a == ".":
- return b
- return posixpath.normpath("../" * (a.count("/") + 1) + b)
- def get_relative_path(self, section):
- """
- Returns a relative (file)path to point to section
- like (e.g. ../other_section)
- If the common parent of both sections is the document (i.e. /),
- return an absolute path
- """
- a = self.get_path()
- b = section.get_path()
- return self._get_relative_path(a, b)
- def clean(self):
- """
- Runs clean() on all immediate child-sections causing any resolved links
- or includes to be unresolved.
- This should be called for the document prior to saving.
- """
- for i in self:
- i.clean()
- def clone(self, children=True, keep_id=False):
- """
- Clone this object recursively allowing to copy it independently
- to another document
- """
- from odml.section import BaseSection
- obj = super(Sectionable, self).clone(children)
- obj._parent = None
- obj._sections = SmartList(BaseSection)
- if children:
- for s in self._sections:
- obj.append(s.clone(keep_id=keep_id))
- return obj
- @property
- def repository(self):
- """ A URL to a terminology. """
- return self._repository
- @repository.setter
- def repository(self, url):
- if not url:
- url = None
- self._repository = url
- if url:
- terminology.deferred_load(url)
- def get_repository(self):
- """
- Return the current applicable repository (may be inherited from a
- parent) or None
- """
- return self._repository
- def create_section(self, name, type="undefined", oid=None):
- """
- Creates a new subsection that is a child of this section.
- :param name: The name of the section to create.
- :param type: The type of the section.
- :param oid: object id, UUID string as specified in RFC 4122. If no id
- is provided, an id will be generated and assigned.
- :return: The new section.
- """
- from odml.section import BaseSection
- sec = BaseSection(name=name, type=type, oid=oid)
- sec.parent = self
- return sec
|