1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from pathlib import Path
- import yaml
- import munch
- import pandas as pd
- import numpy as np
- from helpers import data_management as dm
- from helpers.nsp import fix_timestamps
- import os
- import re
- import logging
- EVENT_FIRST_LINE_RE = re.compile(r"^(\d+),.*Block, start'$")
- EVENT_LAST_LINE_RE = re.compile(r"^(\d+),.*Block, stop'$")
- logger = logging.getLogger('KIAP.sessions')
- def check_event_file_format(ev_file):
- with open(ev_file, "rb") as f:
- first = f.readline().decode() # Read the first line.
- f.seek(-2, os.SEEK_END) # Jump to the second last byte.
- while f.read(1) != b"\n": # Until EOL is found...
- f.seek(-2, os.SEEK_CUR) # ...jump back the read byte plus one more.
- last = f.readline().decode() # Read last line.
- return (EVENT_FIRST_LINE_RE.match(first) is not None) and (EVENT_LAST_LINE_RE.match(last) is not None)
- def get_sessions(path, mode=None, n=None, start=0):
- this_path = Path(path)
- config_files = sorted(this_path.glob("**/config_dump_*.yaml"))
- config_files_read = []
- for cfg_path in config_files[start:]:
- try:
- with open(cfg_path, 'r') as f:
- # cfg = yaml.load(f, Loader=yaml.Loader)
- cfg = munch.Munch.fromYAML(f, Loader=yaml.Loader)
- logger.info(f"Loading {cfg_path}")
- if (mode is None) or (mode == cfg.speller.type):
- cfg_d = {
- 'mode': cfg.speller.type,
- 'cfg': str(Path(*Path(cfg_path).parts[-2:])),
- 'events': cfg.file_handling.get('filename_events'),
- 'data': cfg.file_handling.get('filename_data'),
- 'log': cfg.file_handling.get('filename_log_info')
- }
- if cfg_d['events'] is None or cfg_d['data'] is None or cfg_d['log'] is None:
- continue
- cfg_d['events'] = str(Path(*Path(cfg_d['events']).parts[-2:]))
- if not check_event_file_format(this_path / cfg_d['events']):
- logger.warning(f"{cfg_d['events']} is not valid (first / last line does not match schema)")
- #continue
- cfg_d['data'] = str(Path(*Path(cfg_d['data']).parts[-2:]))
- cfg_d['log'] = str(Path(*Path(cfg_d['log']).parts[-2:]))
- config_files_read.append(cfg_d)
- if (n is not None) and (len(config_files_read) >= n):
- break
- except FileNotFoundError as e:
- logger.warning(f"A file related to {cfg_path} was not found ({e}).")
- # config_files_read.append(cfg)
- cfg_pd = pd.DataFrame(config_files_read)
- return (cfg_pd, len(config_files))
- TRE = re.compile(r"^(\d+),.*$")
- def get_session_data(path, session):
- """
- Load data for a session. Requires the the session configuration file to deduce file format.
- returns time vector for samples, data, and channel list.
- """
- fn_sess = Path(path, session['data'])
- fn_evs = Path(path, session['events'])
- logger.debug(f"gsd loading {fn_evs}")
- with open(Path(path, session['cfg']), 'r') as f:
- params = munch.Munch.fromYAML(f, Loader=yaml.Loader)
- datav, ts, ch_rec_list = dm.get_session(fn_sess, params=params)
- ts, offsets, _ = fix_timestamps(ts)
- with open(fn_evs, 'r') as f:
- evs = f.readlines()
- times = []
- for ev in evs:
- mtch = TRE.match(ev)
- times.append(int(mtch.group(1)))
- tsevs, _, _ = fix_timestamps(np.array(times))
- evt = [tsevs[0] / 3e4, tsevs[-1] / 3e4]
- tv = ts / 3e4
- return tv, datav, ch_rec_list, evt
|