123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- #
- # MIT License
- #
- # Copyright (c) 2019 Keisuke Sehara
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in all
- # copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- # SOFTWARE.
- #
- import sys as _sys
- import json as _js
- import re as _re
- from pathlib import Path as _Path
- import collections as _cl
- DATASETS_METADATA_FILE = 'datasets_metadata.json'
- HOW_TO_USE = """
- ------
- This 'helper.py' is written to work at the **root directory of the dataset**.
- 1. please make sure that the directory structure of the dataset remains unchanged
- (you can miss data files, though).
- 2. please reposition this file inside the root directory (where you can find
- '{DATASETS_METADATA_FILE}').
- 3. change the current directory to the root directory of the dataset.
- 4. from a Python session, run `import helper`.
- """
- DATE_PATTERN = _re.compile(r'[0-9]{4}-[0-9]{2}-[0-9]{2}$')
- rootdir = _Path(__file__).parent
- def __read_root_metadata(rootdir):
- rootdir = _Path(rootdir)
- if not rootdir.is_dir():
- raise RuntimeError(f"not a directory: {rootdir}")
- metadata_file = rootdir / DATASETS_METADATA_FILE
- if not metadata_file.is_file():
- raise RuntimeError(f"not a file: {metadata_file}")
- with open(metadata_file, 'r') as src:
- return _js.load(src, object_hook=_cl.OrderedDict)
- def __errormsg(msg):
- print(f"***{msg} {HOW_TO_USE}", file=_sys.stderr)
- root_metadata = None
- try:
- root_metadata = __read_root_metadata(rootdir)
- except RuntimeError as e:
- __errormsg(f"failed to read from '{DATASETS_METADATA_FILE}' ({e})")
- def describe_datasets(indent=2):
- if root_metadata is None:
- return __errormsg("metadata has not been initialized properly.")
- if isinstance(indent, int):
- indent = ' '*indent
- if len(root_metadata) > 0:
- print("Available datasets")
- for ds_name, ds_desc in root_metadata.items():
- print(f"--------------------\n\ndataset '{ds_name}':")
- desc = ds_desc.get('description', None)
- if desc:
- print(f"{indent*1}(description)")
- print(f"{indent*2}{desc}")
- domains = ds_desc.get("domains", {})
- if len(domains) > 0:
- print(f"{indent*1}(domains)")
- for key, dom_desc in domains.items():
- suffix = dom_desc.get('suffix', '')
- if len(suffix.strip()) == 0:
- suffix = 'no suffix'
- desc = dom_desc.get('description', '(no description)')
- print(f"{indent*2}- domain '{key}' ({suffix})")
- print(f"{indent*3}{desc}")
- else:
- print(f"{indent*1}(no available domains)")
- else:
- print("***no datasets available in this directory!", file=_sys.stderr)
- pathspec = _cl.namedtuple('pathspec', ('context', 'path'))
- class context:
- _parameters = ('dataset', 'subject', 'date', 'domain', 'file')
- _retrievable = ('datasets', 'subjects', 'dates', 'domains', 'files')
- def __init__(self):
- self.__cached = {}
- def __getattr__(self, name):
- if name in self._parameters:
- if name == 'file':
- raise NameError("use 'files' to retrieve file paths")
- return parameter(self, name)
- elif name in self._retrievable:
- return self.retrieve(name)
- def get_datasets(self, as_spec=True, recalculate=False):
- if ('datasets' not in self.__cached.keys()) or (recalculate == True):
- dss = []
- for ds_name in root_metadata.keys():
- spec = pathspec(dict(dataset=ds_name), rootdir / ds_name)
- if spec.path.is_dir():
- if self.__validate__('dataset', ds_name):
- dss.append(spec)
- self.__cached['datasets'] = dss
- self.__cached['dataset_names'] = [item.context['dataset'] for item in dss]
- if as_spec == True:
- return tuple(self.__cached['datasets'])
- else:
- return tuple(self.__cached['dataset_names'])
- def get_subjects(self, as_spec=True, recalculate=False):
- if ('subjects' not in self.__cached.keys()) or (recalculate == True):
- subs = []
- for ds in self.get_datasets(as_spec=True, recalculate=recalculate):
- for child in ds.path.iterdir():
- if not child.is_dir():
- continue
- if self.__validate__('subject', child.name):
- cxt = ds.context.copy()
- cxt['subject'] = child.name
- spec = pathspec(cxt, child)
- subs.append(spec)
- self.__cached['subjects'] = subs
- self.__cached['subject_names'] = sorted(set(item.context['subject'] for item in subs))
- if as_spec == True:
- return tuple(self.__cached['subjects'])
- else:
- return tuple(self.__cached['subject_names'])
- def get_dates(self, as_spec=True, recalculate=False):
- if ('dates' not in self.__cached.keys()) or (recalculate == True):
- dates = []
- for sub in self.get_subjects(as_spec=True, recalculate=recalculate):
- for child in sub.path.iterdir():
- if not child.is_dir():
- continue
- is_session = DATE_PATTERN.search(child.name)
- if not is_session:
- continue
- date = is_session.group(0)
- if self.__validate__('date', date):
- cxt = sub.context.copy()
- cxt['date'] = date
- spec = pathspec(cxt, child)
- dates.append(spec)
- self.__cached['dates'] = dates
- self.__cached['date_values'] = sorted(set(item.context['date'] for item in dates))
- if as_spec == True:
- return tuple(self.__cached['dates'])
- else:
- return tuple(self.__cached['date_values'])
- def get_domains(self, as_spec=True, recalculate=False):
- if ('domains' not in self.__cached.keys()) or (recalculate == True):
- doms = []
- for date in self.get_dates(as_spec=True, recalculate=recalculate):
- for child in date.path.iterdir():
- if not child.is_dir():
- continue
- dom = child.name
- if self.__validate__('domain', dom):
- cxt = date.context.copy()
- cxt['domain'] = dom
- spec = pathspec(cxt, child)
- doms.append(spec)
- self.__cached['domains'] = doms
- self.__cached['domain_names'] = sorted(set(item.context['domain'] for item in doms))
- if as_spec == True:
- return tuple(self.__cached['domains'])
- else:
- return tuple(self.__cached['domain_names'])
- def get_files(self, as_spec=True, recalculate=False):
- if ('files' not in self.__cached.keys()) or (recalculate == True):
- files = []
- for dom in self.get_domains(as_spec=True, recalculate=recalculate):
- for child in dom.path.iterdir():
- spec = pathspec(dom.context.copy(), child)
- files.append(spec)
- self.__cached['files'] = files
- self.__cached['file_paths'] = sorted(str(item.path) for item in files)
- if as_spec == True:
- return tuple(self.__cached['files'])
- else:
- return tuple(self.__cached['file_paths'])
- def retrieve(self, param, recalculate=False):
- if root_metadata is None:
- return __errormsg("metadata has not been initialized properly.")
- options = dict(as_spec=False, recalculate=recalculate)
- if param == 'datasets':
- return self.get_datasets(**options)
- elif param == 'subjects':
- return self.get_subjects(**options)
- elif param == 'dates':
- return self.get_dates(**options)
- elif param == 'domains':
- return self.get_domains(**options)
- elif param == 'files':
- return self.get_files(**options)
- else:
- raise ValueError(f"unknown object type for retieval: {param}")
- def __validate__(self, param, value):
- raise NotImplementedError(f"{self.__class__.__name__}.__validate__")
- class _datasets(context):
- """manages file retrieval from datasets."""
- def __init__(self):
- super().__init__()
- def __repr__(self):
- return '<any>'
- def __validate__(self, param, value):
- return True
- class parameter:
- """manages contexts."""
- def __init__(self, parent, name):
- self.__parent = parent
- self.__name = name
- def __getattr__(self, name):
- if name == 'name':
- return self.__name
- def __repr__(self):
- parent = repr(self.__parent)
- return f"{parent}.{self.__name}"
- def __cond__(self, op, name):
- if not isinstance(name, str):
- raise ValueError(f"cannot compare to {name.__class__} (expected a string)")
- return conditional(op, self, name)
- def __eq__(self, name):
- return self.__cond__('eq', name)
- def __ne__(self, name):
- return self.__cond__('ne', name)
- def __gt__(self, name):
- return self.__cond__('gt', name)
- def __lt__(self, name):
- return self.__cond__('lt', name)
- def __ge__(self, name):
- return self.__cond__('ge', name)
- def __le__(self, name):
- return self.__cond__('le', name)
- class conditional(context):
- """manages conditions in contexts."""
- _opcodes = dict(eq='==',
- ne='!=',
- gt='>',
- ge='>=',
- lt='<',
- le='<=')
- _ops = {
- 'eq': (lambda _x, _v: _x == _v),
- 'ne': (lambda _x, _v: _x != _v),
- 'gt': (lambda _x, _v: _x > _v),
- 'ge': (lambda _x, _v: _x >= _v),
- 'lt': (lambda _x, _v: _x < _v),
- 'le': (lambda _x, _v: _x <= _v)
- }
- def __init__(self, op, param, value):
- super().__init__()
- self.__op = op
- self.__param = param
- self.__value = value
- def __getattr__(self, name):
- if name == 'opcode':
- opcode = self._opcodes.get(self.__op, None)
- if opcode:
- return opcode
- else:
- raise ValueError(f'unknown operation: {op}')
- else:
- return super().__getattr__(name)
- def __join__(self, op, other):
- if not isinstance(other, context):
- raise ValueError(f"cannot join {other.__class__} (expected conditional or joined)")
- return joined(op, self, other)
- def __add__(self, other):
- return self.__join__('add', other)
- def __mul__(self, other):
- return self.__join__('mul', other)
- def __repr__(self):
- return f"({self.__param} {self.opcode} {repr(self.__value)})"
- def __validate__(self, param, value):
- if param != self.__param.name:
- return True
- op = self._ops.get(self.__op, None)
- if op:
- return op(value, self.__value)
- else:
- raise ValueError(f'unknown operation: {op}')
- class joined(context):
- """joins two contexts."""
- def __init__(self, op, set1, set2):
- super().__init__()
- self.__op = op
- self.__set1 = set1
- self.__set2 = set2
- def __getattr__(self, name):
- if name == 'opcode':
- op = self.__op
- if op == 'mul':
- return '*'
- elif op == 'add':
- return '+'
- else:
- raise ValueError(f'unknown operation: {op}')
- else:
- return super().__getattr__(name)
- def __repr__(self):
- return f"({self.__set1} {self.opcode} {self.__set2})"
- def __validate__(self, param, value):
- cond1 = self.__set1.__validate__(param, value)
- cond2 = self.__set2.__validate__(param, value)
- op = self.__op
- if op == 'mul':
- return (cond1 and cond2)
- elif op == 'add':
- return (cond1 or cond2)
- else:
- raise ValueError(f'unknown operation: {op}')
- ### start script upon import
- if __name__ != '__main__':
- if root_metadata is not None:
- describe_datasets()
- datasets = _datasets()
|