Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

rawmcsrawio.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from "Raw" Multi Channel System (MCS) format.
  4. This format is NOT the native MCS format (*.mcd).
  5. This format is a raw format with an internal binary header exported by the
  6. "MC_DataTool binary conversion" with the option header selected.
  7. The internal header contains sampling rate, channel names, gain and units.
  8. Not so bad: everything that Neo needs, so this IO is without parameters.
  9. If some MCS customers read this you should lobby to get the real specification
  10. of the real MCS format (.mcd), then an IO module for the native MCS format
  11. could be written instead of this ersatz.
  12. Author: Samuel Garcia
  13. """
  14. from __future__ import unicode_literals, print_function, division, absolute_import
  15. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  16. _event_channel_dtype)
  17. import numpy as np
  18. import os
  19. import sys
  20. class RawMCSRawIO(BaseRawIO):
  21. extensions = ['raw']
  22. rawmode = 'one-file'
  23. def __init__(self, filename=''):
  24. BaseRawIO.__init__(self)
  25. self.filename = filename
  26. def _source_name(self):
  27. return self.filename
  28. def _parse_header(self):
  29. self._info = info = parse_mcs_raw_header(self.filename)
  30. self.dtype = 'uint16'
  31. self.sampling_rate = info['sampling_rate']
  32. self.nb_channel = len(info['channel_names'])
  33. self._raw_signals = np.memmap(self.filename, dtype=self.dtype, mode='r',
  34. offset=info['header_size']).reshape(-1, self.nb_channel)
  35. sig_channels = []
  36. for c in range(self.nb_channel):
  37. chan_id = c
  38. group_id = 0
  39. sig_channels.append((info['channel_names'][c], chan_id, self.sampling_rate,
  40. self.dtype, info['signal_units'], info['signal_gain'],
  41. info['signal_offset'], group_id))
  42. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  43. # No events
  44. event_channels = []
  45. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  46. # No spikes
  47. unit_channels = []
  48. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  49. # fille into header dict
  50. self.header = {}
  51. self.header['nb_block'] = 1
  52. self.header['nb_segment'] = [1]
  53. self.header['signal_channels'] = sig_channels
  54. self.header['unit_channels'] = unit_channels
  55. self.header['event_channels'] = event_channels
  56. # insert some annotation at some place
  57. self._generate_minimal_annotations()
  58. def _segment_t_start(self, block_index, seg_index):
  59. return 0.
  60. def _segment_t_stop(self, block_index, seg_index):
  61. t_stop = self._raw_signals.shape[0] / self.sampling_rate
  62. return t_stop
  63. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  64. return self._raw_signals.shape[0]
  65. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  66. return 0.
  67. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  68. if channel_indexes is None:
  69. channel_indexes = slice(None)
  70. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  71. return raw_signals
  72. def parse_mcs_raw_header(filename):
  73. """
  74. This is a from-scratch implementation, with some inspiration
  75. (but no code) taken from the following files:
  76. https://github.com/spyking-circus/spyking-circus/blob/master/circus/files/mcs_raw_binary.py
  77. https://github.com/jeffalstott/Multi-Channel-Systems-Import/blob/master/MCS.py
  78. """
  79. MAX_HEADER_SIZE = 5000
  80. with open(filename, mode='rb') as f:
  81. raw_header = f.read(MAX_HEADER_SIZE)
  82. header_size = raw_header.find(b'EOH')
  83. assert header_size != -1, 'Error in reading raw mcs header'
  84. header_size = header_size + 5
  85. raw_header = raw_header[:header_size]
  86. raw_header = raw_header.replace(b'\r', b'')
  87. info = {}
  88. info['header_size'] = header_size
  89. def parse_line(line, key):
  90. if key + b' = ' in line:
  91. v = line.replace(key, b'').replace(b' ', b'').replace(b'=', b'')
  92. return v
  93. keys = (b'Sample rate', b'ADC zero', b'ADC zero', b'El', b'Streams')
  94. for line in raw_header.split(b'\n'):
  95. for key in keys:
  96. v = parse_line(line, key)
  97. if v is None:
  98. continue
  99. if key == b'Sample rate':
  100. info['sampling_rate'] = float(v)
  101. elif key == b'ADC zero':
  102. info['adc_zero'] = int(v)
  103. elif key == b'El':
  104. v = v.decode('Windows-1252')
  105. v = v.replace('/AD', '')
  106. split_pos = 0
  107. while v[split_pos] in '1234567890.':
  108. split_pos += 1
  109. if split_pos == len(v):
  110. split_pos = None
  111. break
  112. assert split_pos is not None, 'Impossible to find units and scaling'
  113. info['signal_gain'] = float(v[:split_pos])
  114. info['signal_units'] = v[split_pos:].replace(u'µ', u'u')
  115. info['signal_offset'] = -info['signal_gain'] * info['adc_zero']
  116. elif key == b'Streams':
  117. info['channel_names'] = v.decode('Windows-1252').split(';')
  118. return info