winwcprawio.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from WinWCP, a software tool written by
  4. John Dempster.
  5. WinWCP is free:
  6. http://spider.science.strath.ac.uk/sipbs/software.htm
  7. Author : sgarcia
  8. Author: Samuel Garcia
  9. """
  10. from __future__ import unicode_literals, print_function, division, absolute_import
  11. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  12. _event_channel_dtype)
  13. import numpy as np
  14. import os
  15. import sys
  16. import struct
  17. class WinWcpRawIO(BaseRawIO):
  18. extensions = ['wcp']
  19. rawmode = 'one-file'
  20. def __init__(self, filename=''):
  21. BaseRawIO.__init__(self)
  22. self.filename = filename
  23. def _source_name(self):
  24. return self.filename
  25. def _parse_header(self):
  26. SECTORSIZE = 512
  27. # only one memmap for all segment to avoid
  28. # "error: [Errno 24] Too many open files"
  29. self._memmap = np.memmap(self.filename, dtype='uint8', mode='r')
  30. with open(self.filename, 'rb') as fid:
  31. headertext = fid.read(1024)
  32. headertext = headertext.decode('ascii')
  33. header = {}
  34. for line in headertext.split('\r\n'):
  35. if '=' not in line:
  36. continue
  37. # print '#' , line , '#'
  38. key, val = line.split('=')
  39. if key in ['NC', 'NR', 'NBH', 'NBA', 'NBD', 'ADCMAX', 'NP', 'NZ', ]:
  40. val = int(val)
  41. elif key in ['AD', 'DT', ]:
  42. val = val.replace(',', '.')
  43. val = float(val)
  44. header[key] = val
  45. nb_segment = header['NR']
  46. self._raw_signals = {}
  47. all_sampling_interval = []
  48. # loop for record number
  49. for seg_index in range(header['NR']):
  50. offset = 1024 + seg_index * (SECTORSIZE * header['NBD'] + 1024)
  51. # read analysis zone
  52. analysisHeader = HeaderReader(fid, AnalysisDescription).read_f(offset=offset)
  53. # read data
  54. NP = (SECTORSIZE * header['NBD']) // 2
  55. NP = NP - NP % header['NC']
  56. NP = NP // header['NC']
  57. NC = header['NC']
  58. ind0 = offset + header['NBA'] * SECTORSIZE
  59. ind1 = ind0 + NP * NC * 2
  60. sigs = self._memmap[ind0:ind1].view('int16').reshape(NP, NC)
  61. self._raw_signals[seg_index] = sigs
  62. all_sampling_interval.append(analysisHeader['SamplingInterval'])
  63. assert np.unique(all_sampling_interval).size == 1
  64. self._sampling_rate = 1. / all_sampling_interval[0]
  65. sig_channels = []
  66. for c in range(header['NC']):
  67. YG = float(header['YG%d' % c].replace(',', '.'))
  68. ADCMAX = header['ADCMAX']
  69. VMax = analysisHeader['VMax'][c]
  70. name = header['YN%d' % c]
  71. chan_id = header['YO%d' % c]
  72. units = header['YU%d' % c]
  73. gain = VMax / ADCMAX / YG
  74. offset = 0.
  75. group_id = 0
  76. sig_channels.append((name, chan_id, self._sampling_rate, 'int16',
  77. units, gain, offset, group_id))
  78. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  79. # No events
  80. event_channels = []
  81. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  82. # No spikes
  83. unit_channels = []
  84. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  85. # fille into header dict
  86. self.header = {}
  87. self.header['nb_block'] = 1
  88. self.header['nb_segment'] = [nb_segment]
  89. self.header['signal_channels'] = sig_channels
  90. self.header['unit_channels'] = unit_channels
  91. self.header['event_channels'] = event_channels
  92. # insert some annotation at some place
  93. self._generate_minimal_annotations()
  94. def _segment_t_start(self, block_index, seg_index):
  95. return 0.
  96. def _segment_t_stop(self, block_index, seg_index):
  97. t_stop = self._raw_signals[seg_index].shape[0] / self._sampling_rate
  98. return t_stop
  99. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  100. return self._raw_signals[seg_index].shape[0]
  101. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  102. return 0.
  103. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  104. # WARNING check if id or index for signals (in the old IO it was ids
  105. # ~ raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_indexes]
  106. if channel_indexes is None:
  107. channel_indexes = np.arange(self.header['signal_channels'].size)
  108. ids = self.header['signal_channels']['id'].tolist()
  109. channel_ids = [ids.index(c) for c in channel_indexes]
  110. raw_signals = self._raw_signals[seg_index][slice(i_start, i_stop), channel_ids]
  111. return raw_signals
  112. AnalysisDescription = [
  113. ('RecordStatus', '8s'),
  114. ('RecordType', '4s'),
  115. ('GroupNumber', 'f'),
  116. ('TimeRecorded', 'f'),
  117. ('SamplingInterval', 'f'),
  118. ('VMax', '8f'),
  119. ]
  120. class HeaderReader():
  121. def __init__(self, fid, description):
  122. self.fid = fid
  123. self.description = description
  124. def read_f(self, offset=0):
  125. self.fid.seek(offset)
  126. d = {}
  127. for key, fmt in self.description:
  128. val = struct.unpack(fmt, self.fid.read(struct.calcsize(fmt)))
  129. if len(val) == 1:
  130. val = val[0]
  131. else:
  132. val = list(val)
  133. d[key] = val
  134. return d