1
1

brainvisionrawio.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. """
  2. Class for reading data from BrainVision product.
  3. This code was originally made by L. Pezard (2010), modified B. Burle and
  4. S. More.
  5. Author: Samuel Garcia
  6. """
  7. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  8. _event_channel_dtype)
  9. import numpy as np
  10. import datetime
  11. import os
  12. import re
  13. class BrainVisionRawIO(BaseRawIO):
  14. """
  15. """
  16. extensions = ['vhdr']
  17. rawmode = 'one-file'
  18. def __init__(self, filename=''):
  19. BaseRawIO.__init__(self)
  20. self.filename = filename
  21. def _parse_header(self):
  22. # Read header file (vhdr)
  23. vhdr_header = read_brainvsion_soup(self.filename)
  24. bname = os.path.basename(self.filename)
  25. marker_filename = self.filename.replace(bname, vhdr_header['Common Infos']['MarkerFile'])
  26. binary_filename = self.filename.replace(bname, vhdr_header['Common Infos']['DataFile'])
  27. assert vhdr_header['Common Infos'][
  28. 'DataFormat'] == 'BINARY', NotImplementedError
  29. assert vhdr_header['Common Infos'][
  30. 'DataOrientation'] == 'MULTIPLEXED', NotImplementedError
  31. nb_channel = int(vhdr_header['Common Infos']['NumberOfChannels'])
  32. sr = 1.e6 / float(vhdr_header['Common Infos']['SamplingInterval'])
  33. self._sampling_rate = sr
  34. fmt = vhdr_header['Binary Infos']['BinaryFormat']
  35. fmts = {'INT_16': np.int16, 'INT_32': np.int32, 'IEEE_FLOAT_32': np.float32, }
  36. assert fmt in fmts, NotImplementedError
  37. sig_dtype = fmts[fmt]
  38. # raw signals memmap
  39. sigs = np.memmap(binary_filename, dtype=sig_dtype, mode='r', offset=0)
  40. if sigs.size % nb_channel != 0:
  41. sigs = sigs[:-sigs.size % nb_channel]
  42. self._raw_signals = sigs.reshape(-1, nb_channel)
  43. sig_channels = []
  44. channel_infos = vhdr_header['Channel Infos']
  45. for c in range(nb_channel):
  46. try:
  47. channel_desc = channel_infos['Ch%d' % (c + 1,)]
  48. except KeyError:
  49. channel_desc = channel_infos['ch%d' % (c + 1,)]
  50. name, ref, res, units = channel_desc.split(',')
  51. units = units.replace('µ', 'u')
  52. chan_id = c + 1
  53. if sig_dtype == np.int16 or sig_dtype == np.int32:
  54. gain = float(res)
  55. else:
  56. gain = 1
  57. offset = 0
  58. group_id = 0
  59. sig_channels.append((name, chan_id, self._sampling_rate, sig_dtype,
  60. units, gain, offset, group_id))
  61. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  62. # No spikes
  63. unit_channels = []
  64. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  65. # read all markers in memory
  66. all_info = read_brainvsion_soup(marker_filename)['Marker Infos']
  67. ev_types = []
  68. ev_timestamps = []
  69. ev_labels = []
  70. for i in range(len(all_info)):
  71. ev_type, ev_label, pos, size, channel = all_info[
  72. 'Mk%d' % (i + 1,)].split(',')[:5]
  73. ev_types.append(ev_type)
  74. ev_timestamps.append(int(pos))
  75. ev_labels.append(ev_label)
  76. ev_types = np.array(ev_types)
  77. ev_timestamps = np.array(ev_timestamps)
  78. ev_labels = np.array(ev_labels, dtype='U')
  79. # group them by types
  80. self._raw_events = []
  81. event_channels = []
  82. for c, ev_type in enumerate(np.unique(ev_types)):
  83. ind = (ev_types == ev_type)
  84. event_channels.append((ev_type, '', 'event'))
  85. self._raw_events.append((ev_timestamps[ind], ev_labels[ind]))
  86. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  87. # fille into header dict
  88. self.header = {}
  89. self.header['nb_block'] = 1
  90. self.header['nb_segment'] = [1]
  91. self.header['signal_channels'] = sig_channels
  92. self.header['unit_channels'] = unit_channels
  93. self.header['event_channels'] = event_channels
  94. self._generate_minimal_annotations()
  95. if 'Coordinates' in vhdr_header:
  96. for c in range(sig_channels.size):
  97. coords = vhdr_header['Coordinates']['Ch{}'.format(c + 1)]
  98. coords = [float(v) for v in coords.split(',')]
  99. if coords[0] > 0.:
  100. # if radius is 0 we do not have coordinates.
  101. self.raw_annotations['signal_channels'][c]['coordinates'] = coords
  102. def _source_name(self):
  103. return self.filename
  104. def _segment_t_start(self, block_index, seg_index):
  105. return 0.
  106. def _segment_t_stop(self, block_index, seg_index):
  107. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  108. return t_stop
  109. ###
  110. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  111. return self._raw_signals.shape[0]
  112. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  113. return 0.
  114. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  115. if channel_indexes is None:
  116. channel_indexes = slice(None)
  117. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  118. return raw_signals
  119. ###
  120. def _spike_count(self, block_index, seg_index, unit_index):
  121. return 0
  122. ###
  123. # event and epoch zone
  124. def _event_count(self, block_index, seg_index, event_channel_index):
  125. all_timestamps, all_label = self._raw_events[event_channel_index]
  126. return all_timestamps.size
  127. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  128. timestamps, labels = self._raw_events[event_channel_index]
  129. if t_start is not None:
  130. keep = timestamps >= int(t_start * self._sampling_rate)
  131. timestamps = timestamps[keep]
  132. labels = labels[keep]
  133. if t_stop is not None:
  134. keep = timestamps <= int(t_stop * self._sampling_rate)
  135. timestamps = timestamps[keep]
  136. labels = labels[keep]
  137. durations = None
  138. return timestamps, durations, labels
  139. raise (NotImplementedError)
  140. def _rescale_event_timestamp(self, event_timestamps, dtype):
  141. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  142. return event_times
  143. def read_brainvsion_soup(filename):
  144. with open(filename, 'r', encoding='utf8') as f:
  145. section = None
  146. all_info = {}
  147. for line in f:
  148. line = line.strip('\n').strip('\r')
  149. if line.startswith('['):
  150. section = re.findall(r'\[([\S ]+)\]', line)[0]
  151. all_info[section] = {}
  152. continue
  153. if line.startswith(';'):
  154. continue
  155. if '=' in line and len(line.split('=')) == 2:
  156. k, v = line.split('=')
  157. all_info[section][k] = v
  158. return all_info