rawmcsrawio.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """
  2. Class for reading data from "Raw" Multi Channel System (MCS) format.
  3. This format is NOT the native MCS format (*.mcd).
  4. This format is a raw format with an internal binary header exported by the
  5. "MC_DataTool binary conversion" with the option header selected.
  6. The internal header contains sampling rate, channel names, gain and units.
  7. Not so bad: everything that Neo needs, so this IO is without parameters.
  8. If some MCS customers read this you should lobby to get the real specification
  9. of the real MCS format (.mcd), then an IO module for the native MCS format
  10. could be written instead of this ersatz.
  11. Author: Samuel Garcia
  12. """
  13. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  14. _event_channel_dtype)
  15. import numpy as np
  16. import os
  17. import sys
  18. class RawMCSRawIO(BaseRawIO):
  19. extensions = ['raw']
  20. rawmode = 'one-file'
  21. def __init__(self, filename=''):
  22. BaseRawIO.__init__(self)
  23. self.filename = filename
  24. def _source_name(self):
  25. return self.filename
  26. def _parse_header(self):
  27. self._info = info = parse_mcs_raw_header(self.filename)
  28. self.dtype = 'uint16'
  29. self.sampling_rate = info['sampling_rate']
  30. self.nb_channel = len(info['channel_names'])
  31. self._raw_signals = np.memmap(self.filename, dtype=self.dtype, mode='r',
  32. offset=info['header_size']).reshape(-1, self.nb_channel)
  33. sig_channels = []
  34. for c in range(self.nb_channel):
  35. chan_id = c
  36. group_id = 0
  37. sig_channels.append((info['channel_names'][c], chan_id, self.sampling_rate,
  38. self.dtype, info['signal_units'], info['signal_gain'],
  39. info['signal_offset'], group_id))
  40. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  41. # No events
  42. event_channels = []
  43. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  44. # No spikes
  45. unit_channels = []
  46. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  47. # fille into header dict
  48. self.header = {}
  49. self.header['nb_block'] = 1
  50. self.header['nb_segment'] = [1]
  51. self.header['signal_channels'] = sig_channels
  52. self.header['unit_channels'] = unit_channels
  53. self.header['event_channels'] = event_channels
  54. # insert some annotation at some place
  55. self._generate_minimal_annotations()
  56. def _segment_t_start(self, block_index, seg_index):
  57. return 0.
  58. def _segment_t_stop(self, block_index, seg_index):
  59. t_stop = self._raw_signals.shape[0] / self.sampling_rate
  60. return t_stop
  61. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  62. return self._raw_signals.shape[0]
  63. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  64. return 0.
  65. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  66. if channel_indexes is None:
  67. channel_indexes = slice(None)
  68. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  69. return raw_signals
  70. def parse_mcs_raw_header(filename):
  71. """
  72. This is a from-scratch implementation, with some inspiration
  73. (but no code) taken from the following files:
  74. https://github.com/spyking-circus/spyking-circus/blob/master/circus/files/mcs_raw_binary.py
  75. https://github.com/jeffalstott/Multi-Channel-Systems-Import/blob/master/MCS.py
  76. """
  77. MAX_HEADER_SIZE = 5000
  78. with open(filename, mode='rb') as f:
  79. raw_header = f.read(MAX_HEADER_SIZE)
  80. header_size = raw_header.find(b'EOH')
  81. assert header_size != -1, 'Error in reading raw mcs header'
  82. header_size = header_size + 5
  83. raw_header = raw_header[:header_size]
  84. raw_header = raw_header.replace(b'\r', b'')
  85. info = {}
  86. info['header_size'] = header_size
  87. def parse_line(line, key):
  88. if key + b' = ' in line:
  89. v = line.replace(key, b'').replace(b' ', b'').replace(b'=', b'')
  90. return v
  91. keys = (b'Sample rate', b'ADC zero', b'ADC zero', b'El', b'Streams')
  92. for line in raw_header.split(b'\n'):
  93. for key in keys:
  94. v = parse_line(line, key)
  95. if v is None:
  96. continue
  97. if key == b'Sample rate':
  98. info['sampling_rate'] = float(v)
  99. elif key == b'ADC zero':
  100. info['adc_zero'] = int(v)
  101. elif key == b'El':
  102. v = v.decode('Windows-1252')
  103. v = v.replace('/AD', '')
  104. split_pos = 0
  105. while v[split_pos] in '1234567890.':
  106. split_pos += 1
  107. if split_pos == len(v):
  108. split_pos = None
  109. break
  110. assert split_pos is not None, 'Impossible to find units and scaling'
  111. info['signal_gain'] = float(v[:split_pos])
  112. info['signal_units'] = v[split_pos:].replace('µ', 'u')
  113. info['signal_offset'] = -info['signal_gain'] * info['adc_zero']
  114. elif key == b'Streams':
  115. info['channel_names'] = v.decode('Windows-1252').split(';')
  116. return info