|
@@ -0,0 +1,373 @@
|
|
|
+#
|
|
|
+# MIT License
|
|
|
+#
|
|
|
+# Copyright (c) 2019 Keisuke Sehara
|
|
|
+#
|
|
|
+# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
+# of this software and associated documentation files (the "Software"), to deal
|
|
|
+# in the Software without restriction, including without limitation the rights
|
|
|
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
+# copies of the Software, and to permit persons to whom the Software is
|
|
|
+# furnished to do so, subject to the following conditions:
|
|
|
+#
|
|
|
+# The above copyright notice and this permission notice shall be included in all
|
|
|
+# copies or substantial portions of the Software.
|
|
|
+#
|
|
|
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
+# SOFTWARE.
|
|
|
+#
|
|
|
+
|
|
|
+import sys as _sys
|
|
|
+import json as _js
|
|
|
+import re as _re
|
|
|
+from pathlib import Path as _Path
|
|
|
+import collections as _cl
|
|
|
+
|
|
|
+DATASETS_METADATA_FILE = 'datasets_metadata.json'
|
|
|
+
|
|
|
+HOW_TO_USE = """
|
|
|
+
|
|
|
+------
|
|
|
+
|
|
|
+This 'helper.py' is written to work at the **root directory of the dataset**.
|
|
|
+
|
|
|
+1. please make sure that the directory structure of the dataset remains unchanged
|
|
|
+ (you can miss data files, though).
|
|
|
+2. please reposition this file inside the root directory (where you can find
|
|
|
+ '{DATASETS_METADATA_FILE}').
|
|
|
+3. change the current directory to the root directory of the dataset.
|
|
|
+4. from a Python session, run `import helper`.
|
|
|
+"""
|
|
|
+
|
|
|
+DATE_PATTERN = _re.compile(r'[0-9]{4}-[0-9]{2}-[0-9]{2}$')
|
|
|
+
|
|
|
+rootdir = _Path(__file__).parent
|
|
|
+
|
|
|
+def __read_root_metadata(rootdir):
|
|
|
+ rootdir = _Path(rootdir)
|
|
|
+ if not rootdir.is_dir():
|
|
|
+ raise RuntimeError(f"not a directory: {rootdir}")
|
|
|
+ metadata_file = rootdir / DATASETS_METADATA_FILE
|
|
|
+ if not metadata_file.is_file():
|
|
|
+ raise RuntimeError(f"not a file: {metadata_file}")
|
|
|
+ with open(metadata_file, 'r') as src:
|
|
|
+ return _js.load(src, object_hook=_cl.OrderedDict)
|
|
|
+
|
|
|
+def __errormsg(msg):
|
|
|
+ print(f"***{msg} {HOW_TO_USE}", file=_sys.stderr)
|
|
|
+
|
|
|
+root_metadata = None
|
|
|
+
|
|
|
+try:
|
|
|
+ root_metadata = __read_root_metadata(rootdir)
|
|
|
+except RuntimeError as e:
|
|
|
+ __errormsg(f"failed to read from '{DATASETS_METADATA_FILE}' ({e})")
|
|
|
+
|
|
|
+def describe_datasets(indent=2):
|
|
|
+ if root_metadata is None:
|
|
|
+ return __errormsg("metadata has not been initialized properly.")
|
|
|
+ if isinstance(indent, int):
|
|
|
+ indent = ' '*indent
|
|
|
+ if len(root_metadata) > 0:
|
|
|
+ print("Available datasets")
|
|
|
+ for ds_name, ds_desc in root_metadata.items():
|
|
|
+ print(f"--------------------\n\ndataset '{ds_name}':")
|
|
|
+
|
|
|
+ desc = ds_desc.get('description', None)
|
|
|
+ if desc:
|
|
|
+ print(f"{indent*1}(description)")
|
|
|
+ print(f"{indent*2}{desc}")
|
|
|
+
|
|
|
+ domains = ds_desc.get("domains", {})
|
|
|
+ if len(domains) > 0:
|
|
|
+ print(f"{indent*1}(domains)")
|
|
|
+ for key, dom_desc in domains.items():
|
|
|
+ suffix = dom_desc.get('suffix', '')
|
|
|
+ if len(suffix.strip()) == 0:
|
|
|
+ suffix = 'no suffix'
|
|
|
+ desc = dom_desc.get('description', '(no description)')
|
|
|
+ print(f"{indent*2}- domain '{key}' ({suffix})")
|
|
|
+ print(f"{indent*3}{desc}")
|
|
|
+ else:
|
|
|
+ print(f"{indent*1}(no available domains)")
|
|
|
+ else:
|
|
|
+ print("***no datasets available in this directory!", file=_sys.stderr)
|
|
|
+
|
|
|
+pathspec = _cl.namedtuple('pathspec', ('context', 'path'))
|
|
|
+
|
|
|
+class context:
|
|
|
+ _parameters = ('dataset', 'subject', 'date', 'domain', 'file')
|
|
|
+ _retrievable = ('datasets', 'subjects', 'dates', 'domains', 'files')
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.__cached = {}
|
|
|
+
|
|
|
+ def __getattr__(self, name):
|
|
|
+ if name in self._parameters:
|
|
|
+ if name == 'file':
|
|
|
+ raise NameError("use 'files' to retrieve file paths")
|
|
|
+ return parameter(self, name)
|
|
|
+ elif name in self._retrievable:
|
|
|
+ return self.retrieve(name)
|
|
|
+
|
|
|
+ def get_datasets(self, as_spec=True, recalculate=False):
|
|
|
+ if ('datasets' not in self.__cached.keys()) or (recalculate == True):
|
|
|
+ dss = []
|
|
|
+ for ds_name in root_metadata.keys():
|
|
|
+ spec = pathspec(dict(dataset=ds_name), rootdir / ds_name)
|
|
|
+ if spec.path.is_dir():
|
|
|
+ if self.__validate__('dataset', ds_name):
|
|
|
+ dss.append(spec)
|
|
|
+ self.__cached['datasets'] = dss
|
|
|
+ self.__cached['dataset_names'] = [item.context['dataset'] for item in dss]
|
|
|
+ if as_spec == True:
|
|
|
+ return tuple(self.__cached['datasets'])
|
|
|
+ else:
|
|
|
+ return tuple(self.__cached['dataset_names'])
|
|
|
+
|
|
|
+ def get_subjects(self, as_spec=True, recalculate=False):
|
|
|
+ if ('subjects' not in self.__cached.keys()) or (recalculate == True):
|
|
|
+ subs = []
|
|
|
+ for ds in self.get_datasets(as_spec=True, recalculate=recalculate):
|
|
|
+ for child in ds.path.iterdir():
|
|
|
+ if not child.is_dir():
|
|
|
+ continue
|
|
|
+ if self.__validate__('subject', child.name):
|
|
|
+ cxt = ds.context.copy()
|
|
|
+ cxt['subject'] = child.name
|
|
|
+ spec = pathspec(cxt, child)
|
|
|
+ subs.append(spec)
|
|
|
+ self.__cached['subjects'] = subs
|
|
|
+ self.__cached['subject_names'] = sorted(set(item.context['subject'] for item in subs))
|
|
|
+ if as_spec == True:
|
|
|
+ return tuple(self.__cached['subjects'])
|
|
|
+ else:
|
|
|
+ return tuple(self.__cached['subject_names'])
|
|
|
+
|
|
|
+ def get_dates(self, as_spec=True, recalculate=False):
|
|
|
+ if ('dates' not in self.__cached.keys()) or (recalculate == True):
|
|
|
+ dates = []
|
|
|
+ for sub in self.get_subjects(as_spec=True, recalculate=recalculate):
|
|
|
+ for child in sub.path.iterdir():
|
|
|
+ if not child.is_dir():
|
|
|
+ continue
|
|
|
+ is_session = DATE_PATTERN.search(child.name)
|
|
|
+ if not is_session:
|
|
|
+ continue
|
|
|
+ date = is_session.group(0)
|
|
|
+ if self.__validate__('date', date):
|
|
|
+ cxt = sub.context.copy()
|
|
|
+ cxt['date'] = date
|
|
|
+ spec = pathspec(cxt, child)
|
|
|
+ dates.append(spec)
|
|
|
+ self.__cached['dates'] = dates
|
|
|
+ self.__cached['date_values'] = sorted(set(item.context['date'] for item in dates))
|
|
|
+ if as_spec == True:
|
|
|
+ return tuple(self.__cached['dates'])
|
|
|
+ else:
|
|
|
+ return tuple(self.__cached['date_values'])
|
|
|
+
|
|
|
+ def get_domains(self, as_spec=True, recalculate=False):
|
|
|
+ if ('domains' not in self.__cached.keys()) or (recalculate == True):
|
|
|
+ doms = []
|
|
|
+ for date in self.get_dates(as_spec=True, recalculate=recalculate):
|
|
|
+ for child in date.path.iterdir():
|
|
|
+ if not child.is_dir():
|
|
|
+ continue
|
|
|
+ dom = child.name
|
|
|
+ if self.__validate__('domain', dom):
|
|
|
+ cxt = date.context.copy()
|
|
|
+ cxt['domain'] = dom
|
|
|
+ spec = pathspec(cxt, child)
|
|
|
+ doms.append(spec)
|
|
|
+ self.__cached['domains'] = doms
|
|
|
+ self.__cached['domain_names'] = sorted(set(item.context['domain'] for item in doms))
|
|
|
+ if as_spec == True:
|
|
|
+ return tuple(self.__cached['domains'])
|
|
|
+ else:
|
|
|
+ return tuple(self.__cached['domain_names'])
|
|
|
+
|
|
|
+ def get_files(self, as_spec=True, recalculate=False):
|
|
|
+ if ('files' not in self.__cached.keys()) or (recalculate == True):
|
|
|
+ files = []
|
|
|
+ for dom in self.get_domains(as_spec=True, recalculate=recalculate):
|
|
|
+ for child in dom.path.iterdir():
|
|
|
+ spec = pathspec(dom.context.copy(), child)
|
|
|
+ files.append(spec)
|
|
|
+ self.__cached['files'] = files
|
|
|
+ self.__cached['file_paths'] = sorted(str(item.path) for item in files)
|
|
|
+ if as_spec == True:
|
|
|
+ return tuple(self.__cached['files'])
|
|
|
+ else:
|
|
|
+ return tuple(self.__cached['file_paths'])
|
|
|
+
|
|
|
+ def retrieve(self, param, recalculate=False):
|
|
|
+ if root_metadata is None:
|
|
|
+ return __errormsg("metadata has not been initialized properly.")
|
|
|
+
|
|
|
+ options = dict(as_spec=False, recalculate=recalculate)
|
|
|
+ if param == 'datasets':
|
|
|
+ return self.get_datasets(**options)
|
|
|
+ elif param == 'subjects':
|
|
|
+ return self.get_subjects(**options)
|
|
|
+ elif param == 'dates':
|
|
|
+ return self.get_dates(**options)
|
|
|
+ elif param == 'domains':
|
|
|
+ return self.get_domains(**options)
|
|
|
+ elif param == 'files':
|
|
|
+ return self.get_files(**options)
|
|
|
+ else:
|
|
|
+ raise ValueError(f"unknown object type for retieval: {param}")
|
|
|
+
|
|
|
+ def __validate__(self, param, value):
|
|
|
+ raise NotImplementedError(f"{self.__class__.__name__}.__validate__")
|
|
|
+
|
|
|
+class _datasets(context):
|
|
|
+ """manages file retrieval from datasets."""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return '<any>'
|
|
|
+
|
|
|
+ def __validate__(self, param, value):
|
|
|
+ return True
|
|
|
+
|
|
|
+class parameter:
|
|
|
+ """manages contexts."""
|
|
|
+ def __init__(self, parent, name):
|
|
|
+ self.__parent = parent
|
|
|
+ self.__name = name
|
|
|
+
|
|
|
+ def __getattr__(self, name):
|
|
|
+ if name == 'name':
|
|
|
+ return self.__name
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ parent = repr(self.__parent)
|
|
|
+ return f"{parent}.{self.__name}"
|
|
|
+
|
|
|
+ def __cond__(self, op, name):
|
|
|
+ if not isinstance(name, str):
|
|
|
+ raise ValueError(f"cannot compare to {name.__class__} (expected a string)")
|
|
|
+ return conditional(op, self, name)
|
|
|
+
|
|
|
+ def __eq__(self, name):
|
|
|
+ return self.__cond__('eq', name)
|
|
|
+
|
|
|
+ def __ne__(self, name):
|
|
|
+ return self.__cond__('ne', name)
|
|
|
+
|
|
|
+ def __gt__(self, name):
|
|
|
+ return self.__cond__('gt', name)
|
|
|
+
|
|
|
+ def __lt__(self, name):
|
|
|
+ return self.__cond__('lt', name)
|
|
|
+
|
|
|
+ def __ge__(self, name):
|
|
|
+ return self.__cond__('ge', name)
|
|
|
+
|
|
|
+ def __le__(self, name):
|
|
|
+ return self.__cond__('le', name)
|
|
|
+
|
|
|
+class conditional(context):
|
|
|
+ """manages conditions in contexts."""
|
|
|
+ _opcodes = dict(eq='==',
|
|
|
+ ne='!=',
|
|
|
+ gt='>',
|
|
|
+ ge='>=',
|
|
|
+ lt='<',
|
|
|
+ le='<=')
|
|
|
+ _ops = {
|
|
|
+ 'eq': (lambda _x, _v: _x == _v),
|
|
|
+ 'ne': (lambda _x, _v: _x != _v),
|
|
|
+ 'gt': (lambda _x, _v: _x > _v),
|
|
|
+ 'ge': (lambda _x, _v: _x >= _v),
|
|
|
+ 'lt': (lambda _x, _v: _x < _v),
|
|
|
+ 'le': (lambda _x, _v: _x <= _v)
|
|
|
+ }
|
|
|
+
|
|
|
+ def __init__(self, op, param, value):
|
|
|
+ super().__init__()
|
|
|
+ self.__op = op
|
|
|
+ self.__param = param
|
|
|
+ self.__value = value
|
|
|
+
|
|
|
+ def __getattr__(self, name):
|
|
|
+ if name == 'opcode':
|
|
|
+ opcode = self._opcodes.get(self.__op, None)
|
|
|
+ if opcode:
|
|
|
+ return opcode
|
|
|
+ else:
|
|
|
+ raise ValueError(f'unknown operation: {op}')
|
|
|
+ else:
|
|
|
+ return super().__getattr__(name)
|
|
|
+
|
|
|
+ def __join__(self, op, other):
|
|
|
+ if not isinstance(other, context):
|
|
|
+ raise ValueError(f"cannot join {other.__class__} (expected conditional or joined)")
|
|
|
+ return joined(op, self, other)
|
|
|
+
|
|
|
+ def __add__(self, other):
|
|
|
+ return self.__join__('add', other)
|
|
|
+
|
|
|
+ def __mul__(self, other):
|
|
|
+ return self.__join__('mul', other)
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return f"({self.__param} {self.opcode} {repr(self.__value)})"
|
|
|
+
|
|
|
+ def __validate__(self, param, value):
|
|
|
+ if param != self.__param.name:
|
|
|
+ return True
|
|
|
+ op = self._ops.get(self.__op, None)
|
|
|
+ if op:
|
|
|
+ return op(value, self.__value)
|
|
|
+ else:
|
|
|
+ raise ValueError(f'unknown operation: {op}')
|
|
|
+
|
|
|
+class joined(context):
|
|
|
+ """joins two contexts."""
|
|
|
+ def __init__(self, op, set1, set2):
|
|
|
+ super().__init__()
|
|
|
+ self.__op = op
|
|
|
+ self.__set1 = set1
|
|
|
+ self.__set2 = set2
|
|
|
+
|
|
|
+ def __getattr__(self, name):
|
|
|
+ if name == 'opcode':
|
|
|
+ op = self.__op
|
|
|
+ if op == 'mul':
|
|
|
+ return '*'
|
|
|
+ elif op == 'add':
|
|
|
+ return '+'
|
|
|
+ else:
|
|
|
+ raise ValueError(f'unknown operation: {op}')
|
|
|
+ else:
|
|
|
+ return super().__getattr__(name)
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return f"({self.__set1} {self.opcode} {self.__set2})"
|
|
|
+
|
|
|
+ def __validate__(self, param, value):
|
|
|
+ cond1 = self.__set1.__validate__(param, value)
|
|
|
+ cond2 = self.__set2.__validate__(param, value)
|
|
|
+ op = self.__op
|
|
|
+ if op == 'mul':
|
|
|
+ return (cond1 and cond2)
|
|
|
+ elif op == 'add':
|
|
|
+ return (cond1 or cond2)
|
|
|
+ else:
|
|
|
+ raise ValueError(f'unknown operation: {op}')
|
|
|
+
|
|
|
+### start script upon import
|
|
|
+if __name__ != '__main__':
|
|
|
+ if root_metadata is not None:
|
|
|
+ describe_datasets()
|
|
|
+ datasets = _datasets()
|