123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """
- Class for reading data from WinEdr, a software tool written by
- John Dempster.
- WinEdr is free:
- http://spider.science.strath.ac.uk/sipbs/software.htm
- Author: Samuel Garcia
- """
- from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
- _event_channel_dtype)
- import numpy as np
- import os
- import sys
- class WinEdrRawIO(BaseRawIO):
- extensions = ['EDR', 'edr']
- rawmode = 'one-file'
- def __init__(self, filename=''):
- BaseRawIO.__init__(self)
- self.filename = filename
- def _source_name(self):
- return self.filename
- def _parse_header(self):
- with open(self.filename, 'rb') as fid:
- headertext = fid.read(2048)
- headertext = headertext.decode('ascii')
- header = {}
- for line in headertext.split('\r\n'):
- if '=' not in line:
- continue
- # print '#' , line , '#'
- key, val = line.split('=')
- if key in ['NC', 'NR', 'NBH', 'NBA', 'NBD', 'ADCMAX', 'NP', 'NZ', 'ADCMAX']:
- val = int(val)
- elif key in ['AD', 'DT', ]:
- val = val.replace(',', '.')
- val = float(val)
- header[key] = val
- self._raw_signals = np.memmap(self.filename, dtype='int16', mode='r',
- shape=(header['NP'] // header['NC'], header['NC'],),
- offset=header['NBH'])
- DT = header['DT']
- if 'TU' in header:
- if header['TU'] == 'ms':
- DT *= .001
- self._sampling_rate = 1. / DT
- sig_channels = []
- for c in range(header['NC']):
- YCF = float(header['YCF%d' % c].replace(',', '.'))
- YAG = float(header['YAG%d' % c].replace(',', '.'))
- YZ = float(header['YZ%d' % c].replace(',', '.'))
- ADCMAX = header['ADCMAX']
- AD = header['AD']
- name = header['YN%d' % c]
- chan_id = header['YO%d' % c]
- units = header['YU%d' % c]
- gain = AD / (YCF * YAG * (ADCMAX + 1))
- offset = -YZ * gain
- group_id = 0
- sig_channels.append((name, chan_id, self._sampling_rate, 'int16',
- units, gain, offset, group_id))
- sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
- # No events
- event_channels = []
- event_channels = np.array(event_channels, dtype=_event_channel_dtype)
- # No spikes
- unit_channels = []
- unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
- # fille into header dict
- self.header = {}
- self.header['nb_block'] = 1
- self.header['nb_segment'] = [1]
- self.header['signal_channels'] = sig_channels
- self.header['unit_channels'] = unit_channels
- self.header['event_channels'] = event_channels
- # insert some annotation at some place
- self._generate_minimal_annotations()
- def _segment_t_start(self, block_index, seg_index):
- return 0.
- def _segment_t_stop(self, block_index, seg_index):
- t_stop = self._raw_signals.shape[0] / self._sampling_rate
- return t_stop
- def _get_signal_size(self, block_index, seg_index, channel_indexes):
- return self._raw_signals.shape[0]
- def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
- return 0.
- def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
- # WARNING check if id or index for signals (in the old IO it was ids
- # ~ raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
- if channel_indexes is None:
- channel_indexes = np.arange(self.header['signal_channels'].size)
- l = self.header['signal_channels']['id'].tolist()
- channel_ids = [l.index(c) for c in channel_indexes]
- raw_signals = self._raw_signals[slice(i_start, i_stop), channel_ids]
- return raw_signals
|