123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
- #
- # MIT License
- #
- # Copyright (c) 2019 Keisuke Sehara
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in all
- # copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- # SOFTWARE.
- #
- import sys as _sys
- import json as _js
- import re as _re
- from pathlib import Path as _Path
- import collections as _cl
- from warnings import warn as _warn
- import numpy as _np
- import pandas as _pd
- DATASETS_METADATA_FILE = 'DATASETS.json'
- HOW_TO_USE = f"""
- ------
- This 'helper.py' is written to work at the **root repository directory**.
- 1. please make sure that the directory structure of the datasets remain unchanged
- (you can miss data files, though).
- 2. please reposition this file inside the root directory (where you can find
- 'REPOSITORY.json').
- 3. change the current directory to the root repository directory.
- 4. from a Python session, run `import helper`.
- """
- SESSION_PATTERN = _re.compile(r'([a-zA-Z]+)([0-9]{4}-[0-9]{2}-[0-9]{2})-([0-9]+)')
- SUBDOMAIN_PATTERN = _re.compile(r'-([a-zA-Z0-9-]+)$')
- RUN_PATTERN = _re.compile(r'_run([0-9]+)_')
- rootdir = _Path(__file__).parent
- datasetdir = rootdir / "datasets"
- def __read_root_metadata(datasetdir):
- rootdir = _Path(datasetdir)
- if not datasetdir.is_dir():
- raise RuntimeError(f"not a directory: {rootdir}")
- metadata_file = datasetdir / DATASETS_METADATA_FILE
- if not metadata_file.is_file():
- raise RuntimeError(f"not a file: {metadata_file}")
- with open(metadata_file, 'r') as src:
- return _js.load(src, object_hook=_cl.OrderedDict)
- def __read_csv_metadata(filename):
- metadata_file = datasetdir / filename
- if not metadata_file.is_file():
- print(f"***cannot read from: {filename}", file=_sys.stderr)
- return None
- return _pd.read_csv(str(metadata_file))
- def __errormsg(msg):
- print(f"***{msg} {HOW_TO_USE}", file=_sys.stderr)
- root_metadata = None
- try:
- root_metadata = __read_root_metadata(rootdir)
- except RuntimeError as e:
- __errormsg(f"failed to read from '{DATASETS_METADATA_FILE}' ({e})")
- subjects_metadata = __read_csv_metadata("SUBJECTS.csv")
- sessions_metadata = __read_csv_metadata("SESSIONS.csv")
- session_params = ('session_name', 'session_type', 'date', 'session_index')
- base_params = tuple(set(('dataset', 'subject', 'domain', 'file', 'subdomain') \
- + session_params))
- parameters = tuple(set(base_params \
- + tuple(subjects_metadata.columns) \
- + tuple(sessions_metadata.columns)))
- def _update_with_subject(orig_context, subject_name):
- row = subjects_metadata.loc[subjects_metadata.name == subject_name,:]
- if row.shape[0] == 0:
- raise RuntimeError(f"subject not found in metadata: {subject_name}")
- cxt = row.iloc[0].to_dict()
- del cxt['name']
- cxt['subject'] = subject_name
- for key, val in orig_context.items():
- cxt[key] = val
- return cxt
- def _update_with_session(orig_context, session_name, session_type, date, session_index):
- matches = _np.array(sessions_metadata.subject == orig_context['subject']) \
- * _np.array(sessions_metadata.date == date) \
- * (_np.array(sessions_metadata['index'], dtype=int) == int(session_index))
- row = sessions_metadata.loc[matches,:]
- if row.shape[0] == 0:
- raise RuntimeError(f"session not found in metadata: {session_name}")
- cxt = row.iloc[0].to_dict()
- for key in ('subject', 'date', 'index'):
- del cxt[key]
- cxt['session_name'] = session_name
- cxt['session_type'] = session_type
- cxt['date'] = date
- cxt['session_index'] = session_index
- for key, val in orig_context.items():
- cxt[key] = val
- return cxt
- def describe_datasets(indent=2):
- if root_metadata is None:
- return __errormsg("metadata has not been initialized properly.")
- if isinstance(indent, int):
- indent = ' '*indent
- if len(root_metadata) > 0:
- print("Available datasets")
- for ds_name, ds_desc in root_metadata.items():
- print(f"--------------------\n\ndataset '{ds_name}':")
- desc = ds_desc.get('description', None)
- if desc:
- print(f"{indent*1}(description)")
- print(f"{indent*2}{desc}")
- domains = ds_desc.get("domains", {})
- if len(domains) > 0:
- print(f"{indent*1}(domains)")
- for key, dom_desc in domains.items():
- suffix = dom_desc.get('suffix', '')
- if len(suffix.strip()) == 0:
- suffix = 'no suffix'
- desc = dom_desc.get('description', '(no description)')
- print(f"{indent*2}- domain '{key}' ({suffix})")
- print(f"{indent*3}{desc}")
- else:
- print(f"{indent*1}(no available domains)")
- else:
- print("***no datasets available in this directory!", file=_sys.stderr)
- class dataspec(_cl.namedtuple('_dataspec', ('context', 'data'))):
- def __getattr__(self, name):
- val = super().__getattr__(name)
- if val:
- return val
- if name in self.context.keys():
- return self.context[name]
- def convert_data(self, datafunc):
- return self.__class__(self.context, datafunc(self.data))
- class predicate:
- _retrievable = ('datasets', 'subjects', 'domains', 'files', 'subdomains') \
- + ('session_names', 'session_types', 'dates', 'session_indices')
- def __init__(self):
- self.__cached = {}
- def __getattr__(self, name):
- if name in parameters:
- if name == 'file':
- raise NameError("use 'files' to retrieve file paths")
- return parameter(self, name)
- elif name == 'subdomain':
- raise ValueError("use '<context>.has_subdomain(<subdom>)' expression for restricting to a subdomain")
- elif name in self._retrievable:
- return self.retrieve(name)
- else:
- raise AttributeError(name)
- def get_datasets(self, as_spec=True, recalculate=False):
- if ('datasets' not in self.__cached.keys()) or (recalculate == True):
- dss = []
- for ds_name in root_metadata.keys():
- spec = dataspec(dict(dataset=ds_name), datasetdir / ds_name)
- if spec.data.is_dir():
- if self.__validate__('dataset', spec.context):
- # print(f"adding dataset: {ds_name}")
- dss.append(spec)
- self.__cached['datasets'] = dss
- self.__cached['dataset_names'] = [item.context['dataset'] for item in dss]
- if as_spec == True:
- return tuple(self.__cached['datasets'])
- else:
- return tuple(self.__cached['dataset_names'])
- def get_subjects(self, as_spec=True, recalculate=False):
- if ('subjects' not in self.__cached.keys()) or (recalculate == True):
- subs = []
- for ds in self.get_datasets(as_spec=True, recalculate=recalculate):
- for child in ds.data.iterdir():
- if not child.is_dir():
- continue
- cxt = _update_with_subject(ds.context, child.name)
- if self.__validate__('subject', cxt):
- # print(f"adding: {ds.context['dataset']}/{child.name}")
- spec = dataspec(cxt, child)
- subs.append(spec)
- self.__cached['subjects'] = subs
- self.__cached['subject_names'] = sorted(set(item.context['subject'] for item in subs))
- if as_spec == True:
- return tuple(self.__cached['subjects'])
- else:
- return tuple(self.__cached['subject_names'])
- def get_session_names(self, as_spec=True, recalculate=False):
- return self.get_sessions_impl(mode='session_names', as_spec=as_spec, recalculate=recalculate)
- def get_session_types(self, as_spec=True, recalculate=False):
- return self.get_sessions_impl(mode='session_types', as_spec=as_spec, recalculate=recalculate)
- def get_dates(self, as_spec=True, recalculate=False):
- return self.get_sessions_impl(mode='dates', as_spec=as_spec, recalculate=recalculate)
- def get_session_indices(self, as_spec=True, recalculate=False):
- return self.get_sessions_impl(mode='session_indices', as_spec=as_spec, recalculate=recalculate)
- def get_sessions_impl(self, mode='dates', as_spec=True, recalculate=False):
- if ('sessions' not in self.__cached.keys()) or (recalculate == True):
- sessions = []
- for sub in self.get_subjects(as_spec=True, recalculate=recalculate):
- for child in sub.data.iterdir():
- if not child.is_dir():
- continue
- # print(f"child: {child.name}")
- is_session = SESSION_PATTERN.search(child.name)
- if not is_session:
- continue
- stype = is_session.group(1)
- date = is_session.group(2)
- idx = is_session.group(3)
- sname = f"{stype}{date}-{idx}"
- cxt = _update_with_session(sub.context, sname, stype, date, idx)
- # print(f"session: name={sname}; type={stype}; date={date}; idx={idx}")
- if self.__validate__('session_name', cxt):
- # print(f"adding: {sub.context['dataset']}/{sub.context['subject']}/{date}")
- spec = dataspec(cxt, child)
- sessions.append(spec)
- self.__cached['sessions'] = sessions
- self.__cached['dates'] = sorted(set(item.context['date'] for item in sessions))
- self.__cached['session_names'] = sorted(set(item.context['session_name'] for item in sessions))
- self.__cached['session_types'] = sorted(set(item.context['session_type'] for item in sessions))
- self.__cached['session_indices'] = sorted(set(item.context['session_index'] for item in sessions))
- if as_spec == True:
- return self.__cached['sessions']
- else:
- return tuple(self.__cached[mode])
- def get_domains(self, as_spec=True, recalculate=False):
- if ('domains' not in self.__cached.keys()) or (recalculate == True):
- doms = []
- for sessions in self.get_dates(as_spec=True, recalculate=recalculate):
- for child in sessions.data.iterdir():
- if not child.is_dir():
- continue
- dom = child.name
- cxt = sessions.context.copy()
- cxt['domain'] = dom
- # print(f"domain={child.name}")
- if self.__validate__('domain', cxt):
- # print(f"adding: {date.context['dataset']}/{date.context['subject']}/{date.context['date']}/{dom}")
- spec = dataspec(cxt, child)
- doms.append(spec)
- self.__cached['domains'] = doms
- self.__cached['domain_names'] = sorted(set(item.context['domain'] for item in doms))
- if as_spec == True:
- return tuple(self.__cached['domains'])
- else:
- return tuple(self.__cached['domain_names'])
- def get_files(self, as_spec=True, recalculate=False):
- return self.get_subdomains_impl(mode='files', as_spec=as_spec, recalculate=recalculate)
- def get_subdomains(self, as_spec=True, recalculate=False):
- return self.get_subdomains_impl(mode='subdomains', as_spec=as_spec, recalculate=recalculate)
- def get_subdomains_impl(self, mode='subdomains', as_spec=True, recalculate=False):
- if ('files' not in self.__cached.keys()) or (recalculate == True):
- files = []
- for dom in self.get_domains(as_spec=True, recalculate=recalculate):
- for child in dom.data.iterdir():
- # except for dot files
- if child.name.startswith('.'):
- continue
- has_subdomain = SUBDOMAIN_PATTERN.search(child.stem)
- if has_subdomain:
- subdomains = tuple(has_subdomain.group(1).split('-'))
- else:
- subdomains = ()
- has_run = RUN_PATTERN.search(child.name)
- cxt = dom.context.copy()
- cxt['subdomains'] = subdomains
- if has_run:
- cxt['run'] = int(has_run.group(1))
- # print(f"name={child.name}; subdomains={subdomains}")
- if self.__validate__('subdomain', cxt):
- spec = dataspec(cxt, child)
- files.append(spec)
- self.__cached['files'] = files
- self.__cached['file_paths'] = sorted(str(item.data) for item in files)
- self.__cached['subdomains'] = sorted(set(item.context['subdomains'] for item in files))
- if as_spec == True:
- return tuple(self.__cached['files'])
- elif mode == 'subdomains':
- return tuple(self.__cached['subdomains'])
- else:
- return tuple(self.__cached['file_paths'])
- def retrieve(self, param, recalculate=False):
- if root_metadata is None:
- return __errormsg("metadata has not been initialized properly.")
- options = dict(as_spec=False, recalculate=recalculate)
- if param == 'datasets':
- return self.get_datasets(**options)
- elif param == 'subjects':
- return self.get_subjects(**options)
- elif param in ('dates', 'session_names', 'session_types', 'session_indices'):
- return self.get_sessions_impl(mode=param,**options)
- elif param == 'domains':
- return self.get_domains(**options)
- elif param in ('files', 'subdomains'):
- return self.get_subdomains_impl(mode=param,**options)
- else:
- raise ValueError(f"unknown object type for retieval: {param}")
- def __validate__(self, param, value):
- raise NotImplementedError(f"{self.__class__.__name__}.__validate__")
- def __join__(self, op, other):
- if not isinstance(other, predicate):
- raise ValueError(f"cannot join {other.__class__} (expected conditional or joined)")
- return joined(op, self, other)
- def __add__(self, other):
- return self.__join__('add', other)
- def __mul__(self, other):
- return self.__join__('mul', other)
- def has_subdomain(self, subdom):
- return conditional('has', parameter(self, 'subdomains'), subdom)
- class _datasets(predicate):
- """manages file retrieval from datasets."""
- def __init__(self):
- super().__init__()
- def __repr__(self):
- return '<any>'
- def __validate__(self, param, value):
- return True
- class parameter:
- """manages contexts."""
- def __init__(self, parent, name):
- self.__parent = parent
- self.__name = name
- def __getattr__(self, name):
- if name == 'name':
- return self.__name
- else:
- raise AttributeError(name)
- def __repr__(self):
- parent = repr(self.__parent)
- return f"{parent}.{self.__name}"
- def __cond__(self, op, name):
- if not isinstance(name, str):
- raise ValueError(f"cannot compare to {name.__class__} (expected a string)")
- return conditional(op, self, name)
- def __eq__(self, value):
- return self.__cond__('eq', value)
- def __ne__(self, value):
- return self.__cond__('ne', value)
- def __gt__(self, value):
- return self.__cond__('gt', value)
- def __lt__(self, value):
- return self.__cond__('lt', value)
- def __ge__(self, value):
- return self.__cond__('ge', value)
- def __le__(self, value):
- return self.__cond__('le', value)
- def __validate__(self, param, context):
- return self.__parent.__validate__(param, context)
- class conditional(predicate):
- """manages conditions in contexts."""
- _opcodes = dict(eq='==',
- ne='!=',
- gt='>',
- ge='>=',
- lt='<',
- le='<=',
- has='has')
- _ops = {
- 'eq': (lambda _x, _v: _x == _v),
- 'ne': (lambda _x, _v: _x != _v),
- 'gt': (lambda _x, _v: _x > _v),
- 'ge': (lambda _x, _v: _x >= _v),
- 'lt': (lambda _x, _v: _x < _v),
- 'le': (lambda _x, _v: _x <= _v),
- 'has': (lambda _x, _v: _v in _x)
- }
- def __init__(self, op, param, value):
- super().__init__()
- self.__op = op
- self.__param = param
- self.__value = value
- def __getattr__(self, name):
- if name == 'opcode':
- opcode = self._opcodes.get(self.__op, None)
- if opcode:
- return opcode
- else:
- raise ValueError(f'unknown operation: {op}')
- else:
- return super().__getattr__(name)
- def __repr__(self):
- return f"({self.__param} {self.opcode} {repr(self.__value)})"
- def __validate__(self, param, context):
- if not self.__param.__validate__(param, context):
- return False
- elif self.__param.name not in base_params:
- # subject-, session- or trial- related variables
- if self.__param.name not in context.keys():
- return True
- elif (self.__param.name not in session_params) \
- or (param not in session_params):
- if param != self.__param.name:
- return True
- op = self._ops.get(self.__op, None)
- val = context.get(self.__param.name, None)
- if op:
- if val:
- return op(val, self.__value)
- else:
- return True
- else:
- raise ValueError(f'unknown operation: {op}')
- class joined(predicate):
- """joins two contexts."""
- def __init__(self, op, set1, set2):
- super().__init__()
- self.__op = op
- self.__set1 = set1
- self.__set2 = set2
- def __getattr__(self, name):
- if name == 'opcode':
- op = self.__op
- if op == 'mul':
- return '*'
- elif op == 'add':
- return '+'
- else:
- raise ValueError(f'unknown operation: {op}')
- else:
- return super().__getattr__(name)
- def __repr__(self):
- return f"({self.__set1} {self.opcode} {self.__set2})"
- def __validate__(self, param, value):
- cond1 = self.__set1.__validate__(param, value)
- cond2 = self.__set2.__validate__(param, value)
- op = self.__op
- if op == 'mul':
- return (cond1 and cond2)
- elif op == 'add':
- return (cond1 or cond2)
- else:
- raise ValueError(f'unknown operation: {op}')
- ### start script upon import
- datasets = _datasets()
- if __name__ != '__main__':
- if root_metadata is not None:
- describe_datasets()
|