winedrrawio.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. Class for reading data from WinEdr, a software tool written by
  3. John Dempster.
  4. WinEdr is free:
  5. http://spider.science.strath.ac.uk/sipbs/software.htm
  6. Author: Samuel Garcia
  7. """
  8. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  9. _event_channel_dtype)
  10. import numpy as np
  11. import os
  12. import sys
  13. class WinEdrRawIO(BaseRawIO):
  14. extensions = ['EDR', 'edr']
  15. rawmode = 'one-file'
  16. def __init__(self, filename=''):
  17. BaseRawIO.__init__(self)
  18. self.filename = filename
  19. def _source_name(self):
  20. return self.filename
  21. def _parse_header(self):
  22. with open(self.filename, 'rb') as fid:
  23. headertext = fid.read(2048)
  24. headertext = headertext.decode('ascii')
  25. header = {}
  26. for line in headertext.split('\r\n'):
  27. if '=' not in line:
  28. continue
  29. # print '#' , line , '#'
  30. key, val = line.split('=')
  31. if key in ['NC', 'NR', 'NBH', 'NBA', 'NBD', 'ADCMAX', 'NP', 'NZ', 'ADCMAX']:
  32. val = int(val)
  33. elif key in ['AD', 'DT', ]:
  34. val = val.replace(',', '.')
  35. val = float(val)
  36. header[key] = val
  37. self._raw_signals = np.memmap(self.filename, dtype='int16', mode='r',
  38. shape=(header['NP'] // header['NC'], header['NC'],),
  39. offset=header['NBH'])
  40. DT = header['DT']
  41. if 'TU' in header:
  42. if header['TU'] == 'ms':
  43. DT *= .001
  44. self._sampling_rate = 1. / DT
  45. sig_channels = []
  46. for c in range(header['NC']):
  47. YCF = float(header['YCF%d' % c].replace(',', '.'))
  48. YAG = float(header['YAG%d' % c].replace(',', '.'))
  49. YZ = float(header['YZ%d' % c].replace(',', '.'))
  50. ADCMAX = header['ADCMAX']
  51. AD = header['AD']
  52. name = header['YN%d' % c]
  53. chan_id = header['YO%d' % c]
  54. units = header['YU%d' % c]
  55. gain = AD / (YCF * YAG * (ADCMAX + 1))
  56. offset = -YZ * gain
  57. group_id = 0
  58. sig_channels.append((name, chan_id, self._sampling_rate, 'int16',
  59. units, gain, offset, group_id))
  60. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  61. # No events
  62. event_channels = []
  63. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  64. # No spikes
  65. unit_channels = []
  66. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  67. # fille into header dict
  68. self.header = {}
  69. self.header['nb_block'] = 1
  70. self.header['nb_segment'] = [1]
  71. self.header['signal_channels'] = sig_channels
  72. self.header['unit_channels'] = unit_channels
  73. self.header['event_channels'] = event_channels
  74. # insert some annotation at some place
  75. self._generate_minimal_annotations()
  76. def _segment_t_start(self, block_index, seg_index):
  77. return 0.
  78. def _segment_t_stop(self, block_index, seg_index):
  79. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  80. return t_stop
  81. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  82. return self._raw_signals.shape[0]
  83. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  84. return 0.
  85. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  86. # WARNING check if id or index for signals (in the old IO it was ids
  87. # ~ raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  88. if channel_indexes is None:
  89. channel_indexes = np.arange(self.header['signal_channels'].size)
  90. l = self.header['signal_channels']['id'].tolist()
  91. channel_ids = [l.index(c) for c in channel_indexes]
  92. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_ids]
  93. return raw_signals