Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

brainvisionrawio.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from BrainVision product.
  4. This code was originally made by L. Pezard (2010), modified B. Burle and
  5. S. More.
  6. Author: Samuel Garcia
  7. """
  8. from __future__ import unicode_literals, print_function, division, absolute_import
  9. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  10. _event_channel_dtype)
  11. import numpy as np
  12. import datetime
  13. import os
  14. import re
  15. import io
  16. class BrainVisionRawIO(BaseRawIO):
  17. """
  18. """
  19. extensions = ['vhdr']
  20. rawmode = 'one-file'
  21. def __init__(self, filename=''):
  22. BaseRawIO.__init__(self)
  23. self.filename = filename
  24. def _parse_header(self):
  25. # Read header file (vhdr)
  26. vhdr_header = read_brainvsion_soup(self.filename)
  27. bname = os.path.basename(self.filename)
  28. marker_filename = self.filename.replace(bname, vhdr_header['Common Infos']['MarkerFile'])
  29. binary_filename = self.filename.replace(bname, vhdr_header['Common Infos']['DataFile'])
  30. assert vhdr_header['Common Infos'][
  31. 'DataFormat'] == 'BINARY', NotImplementedError
  32. assert vhdr_header['Common Infos'][
  33. 'DataOrientation'] == 'MULTIPLEXED', NotImplementedError
  34. nb_channel = int(vhdr_header['Common Infos']['NumberOfChannels'])
  35. sr = 1.e6 / float(vhdr_header['Common Infos']['SamplingInterval'])
  36. self._sampling_rate = sr
  37. fmt = vhdr_header['Binary Infos']['BinaryFormat']
  38. fmts = {'INT_16': np.int16, 'INT_32': np.int32, 'IEEE_FLOAT_32': np.float32, }
  39. assert fmt in fmts, NotImplementedError
  40. sig_dtype = fmts[fmt]
  41. # raw signals memmap
  42. sigs = np.memmap(binary_filename, dtype=sig_dtype, mode='r', offset=0)
  43. if sigs.size % nb_channel != 0:
  44. sigs = sigs[:-sigs.size % nb_channel]
  45. self._raw_signals = sigs.reshape(-1, nb_channel)
  46. sig_channels = []
  47. for c in range(nb_channel):
  48. name, ref, res, units = vhdr_header['Channel Infos'][
  49. 'Ch%d' % (c + 1,)].split(',')
  50. units = units.replace('µ', 'u')
  51. chan_id = c + 1
  52. if sig_dtype == np.int16 or sig_dtype == np.int32:
  53. gain = float(res)
  54. else:
  55. gain = 1
  56. offset = 0
  57. group_id = 0
  58. sig_channels.append((name, chan_id, self._sampling_rate, sig_dtype,
  59. units, gain, offset, group_id))
  60. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  61. # No spikes
  62. unit_channels = []
  63. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  64. # read all markers in memory
  65. all_info = read_brainvsion_soup(marker_filename)['Marker Infos']
  66. ev_types = []
  67. ev_timestamps = []
  68. ev_labels = []
  69. for i in range(len(all_info)):
  70. ev_type, ev_label, pos, size, channel = all_info[
  71. 'Mk%d' % (i + 1,)].split(',')[:5]
  72. ev_types.append(ev_type)
  73. ev_timestamps.append(int(pos))
  74. ev_labels.append(ev_label)
  75. ev_types = np.array(ev_types)
  76. ev_timestamps = np.array(ev_timestamps)
  77. ev_labels = np.array(ev_labels, dtype='U')
  78. # group them by types
  79. self._raw_events = []
  80. event_channels = []
  81. for c, ev_type in enumerate(np.unique(ev_types)):
  82. ind = (ev_types == ev_type)
  83. event_channels.append((ev_type, '', 'event'))
  84. self._raw_events.append((ev_timestamps[ind], ev_labels[ind]))
  85. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  86. # fille into header dict
  87. self.header = {}
  88. self.header['nb_block'] = 1
  89. self.header['nb_segment'] = [1]
  90. self.header['signal_channels'] = sig_channels
  91. self.header['unit_channels'] = unit_channels
  92. self.header['event_channels'] = event_channels
  93. self._generate_minimal_annotations()
  94. for c in range(sig_channels.size):
  95. coords = vhdr_header['Coordinates']['Ch{}'.format(c + 1)]
  96. coords = [float(v) for v in coords.split(',')]
  97. if coords[0] > 0.:
  98. # if radius is 0 we do not have coordinates.
  99. self.raw_annotations['signal_channels'][c]['coordinates'] = coords
  100. def _source_name(self):
  101. return self.filename
  102. def _segment_t_start(self, block_index, seg_index):
  103. return 0.
  104. def _segment_t_stop(self, block_index, seg_index):
  105. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  106. return t_stop
  107. ###
  108. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  109. return self._raw_signals.shape[0]
  110. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  111. return 0.
  112. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  113. if channel_indexes is None:
  114. channel_indexes = slice(None)
  115. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  116. return raw_signals
  117. ###
  118. def _spike_count(self, block_index, seg_index, unit_index):
  119. return 0
  120. ###
  121. # event and epoch zone
  122. def _event_count(self, block_index, seg_index, event_channel_index):
  123. all_timestamps, all_label = self._raw_events[event_channel_index]
  124. return all_timestamps.size
  125. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  126. timestamps, labels = self._raw_events[event_channel_index]
  127. if t_start is not None:
  128. keep = timestamps >= int(t_start * self._sampling_rate)
  129. timestamps = timestamps[keep]
  130. labels = labels[keep]
  131. if t_stop is not None:
  132. keep = timestamps <= int(t_stop * self._sampling_rate)
  133. timestamps = timestamps[keep]
  134. labels = labels[keep]
  135. durations = None
  136. return timestamps, durations, labels
  137. raise (NotImplementedError)
  138. def _rescale_event_timestamp(self, event_timestamps, dtype):
  139. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  140. return event_times
  141. def read_brainvsion_soup(filename):
  142. with io.open(filename, 'r', encoding='utf8') as f:
  143. section = None
  144. all_info = {}
  145. for line in f:
  146. line = line.strip('\n').strip('\r')
  147. if line.startswith('['):
  148. section = re.findall(r'\[([\S ]+)\]', line)[0]
  149. all_info[section] = {}
  150. continue
  151. if line.startswith(';'):
  152. continue
  153. if '=' in line and len(line.split('=')) == 2:
  154. k, v = line.split('=')
  155. all_info[section][k] = v
  156. return all_info