winwcprawio.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. """
  2. Class for reading data from WinWCP, a software tool written by
  3. John Dempster.
  4. WinWCP is free:
  5. http://spider.science.strath.ac.uk/sipbs/software.htm
  6. Author : sgarcia
  7. Author: Samuel Garcia
  8. """
  9. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  10. _event_channel_dtype)
  11. import numpy as np
  12. import struct
  13. class WinWcpRawIO(BaseRawIO):
  14. extensions = ['wcp']
  15. rawmode = 'one-file'
  16. def __init__(self, filename=''):
  17. BaseRawIO.__init__(self)
  18. self.filename = filename
  19. def _source_name(self):
  20. return self.filename
  21. def _parse_header(self):
  22. SECTORSIZE = 512
  23. # only one memmap for all segment to avoid
  24. # "error: [Errno 24] Too many open files"
  25. self._memmap = np.memmap(self.filename, dtype='uint8', mode='r')
  26. with open(self.filename, 'rb') as fid:
  27. headertext = fid.read(1024)
  28. headertext = headertext.decode('ascii')
  29. header = {}
  30. for line in headertext.split('\r\n'):
  31. if '=' not in line:
  32. continue
  33. # print '#' , line , '#'
  34. key, val = line.split('=')
  35. if key in ['NC', 'NR', 'NBH', 'NBA', 'NBD', 'ADCMAX', 'NP', 'NZ', ]:
  36. val = int(val)
  37. elif key in ['AD', 'DT', ]:
  38. val = val.replace(',', '.')
  39. val = float(val)
  40. header[key] = val
  41. nb_segment = header['NR']
  42. self._raw_signals = {}
  43. all_sampling_interval = []
  44. # loop for record number
  45. for seg_index in range(header['NR']):
  46. offset = 1024 + seg_index * (SECTORSIZE * header['NBD'] + 1024)
  47. # read analysis zone
  48. analysisHeader = HeaderReader(fid, AnalysisDescription).read_f(offset=offset)
  49. # read data
  50. NP = (SECTORSIZE * header['NBD']) // 2
  51. NP = NP - NP % header['NC']
  52. NP = NP // header['NC']
  53. NC = header['NC']
  54. ind0 = offset + header['NBA'] * SECTORSIZE
  55. ind1 = ind0 + NP * NC * 2
  56. sigs = self._memmap[ind0:ind1].view('int16').reshape(NP, NC)
  57. self._raw_signals[seg_index] = sigs
  58. all_sampling_interval.append(analysisHeader['SamplingInterval'])
  59. assert np.unique(all_sampling_interval).size == 1
  60. self._sampling_rate = 1. / all_sampling_interval[0]
  61. sig_channels = []
  62. for c in range(header['NC']):
  63. YG = float(header['YG%d' % c].replace(',', '.'))
  64. ADCMAX = header['ADCMAX']
  65. VMax = analysisHeader['VMax'][c]
  66. name = header['YN%d' % c]
  67. chan_id = header['YO%d' % c]
  68. units = header['YU%d' % c]
  69. gain = VMax / ADCMAX / YG
  70. offset = 0.
  71. group_id = 0
  72. sig_channels.append((name, chan_id, self._sampling_rate, 'int16',
  73. units, gain, offset, group_id))
  74. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  75. # No events
  76. event_channels = []
  77. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  78. # No spikes
  79. unit_channels = []
  80. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  81. # fille into header dict
  82. self.header = {}
  83. self.header['nb_block'] = 1
  84. self.header['nb_segment'] = [nb_segment]
  85. self.header['signal_channels'] = sig_channels
  86. self.header['unit_channels'] = unit_channels
  87. self.header['event_channels'] = event_channels
  88. # insert some annotation at some place
  89. self._generate_minimal_annotations()
  90. def _segment_t_start(self, block_index, seg_index):
  91. return 0.
  92. def _segment_t_stop(self, block_index, seg_index):
  93. t_stop = self._raw_signals[seg_index].shape[0] / self._sampling_rate
  94. return t_stop
  95. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  96. return self._raw_signals[seg_index].shape[0]
  97. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  98. return 0.
  99. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  100. # WARNING check if id or index for signals (in the old IO it was ids
  101. # ~ raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes]
  102. if channel_indexes is None:
  103. channel_indexes = np.arange(self.header['signal_channels'].size)
  104. ids = self.header['signal_channels']['id'].tolist()
  105. channel_ids = [ids.index(c) for c in channel_indexes]
  106. raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_ids]
  107. return raw_signals
  108. AnalysisDescription = [
  109. ('RecordStatus', '8s'),
  110. ('RecordType', '4s'),
  111. ('GroupNumber', 'f'),
  112. ('TimeRecorded', 'f'),
  113. ('SamplingInterval', 'f'),
  114. ('VMax', '8f'),
  115. ]
  116. class HeaderReader():
  117. def __init__(self, fid, description):
  118. self.fid = fid
  119. self.description = description
  120. def read_f(self, offset=0):
  121. self.fid.seek(offset)
  122. d = {}
  123. for key, fmt in self.description:
  124. val = struct.unpack(fmt, self.fid.read(struct.calcsize(fmt)))
  125. if len(val) == 1:
  126. val = val[0]
  127. else:
  128. val = list(val)
  129. d[key] = val
  130. return d