Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

brainvisionio.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from BrainVision product.
  4. This code was originally made by L. Pezard (2010), modified B. Burle and
  5. S. More.
  6. Supported : Read
  7. Author: sgarcia
  8. """
  9. import os
  10. import re
  11. import numpy as np
  12. import quantities as pq
  13. from neo.io.baseio import BaseIO
  14. from neo.core import Segment, AnalogSignal, Event
  15. class BrainVisionIO(BaseIO):
  16. """
  17. Class for reading/writing data from BrainVision products (brainAmp,
  18. brain analyser...)
  19. Usage:
  20. >>> from neo import io
  21. >>> r = io.BrainVisionIO( filename = 'File_brainvision_1.eeg')
  22. >>> seg = r.read_segment(lazy = False, cascade = True,)
  23. """
  24. is_readable = True
  25. is_writable = False
  26. supported_objects = [Segment, AnalogSignal, Event]
  27. readable_objects = [Segment]
  28. writeable_objects = []
  29. has_header = False
  30. is_streameable = False
  31. read_params = {Segment: []}
  32. write_params = {Segment: []}
  33. name = None
  34. extensions = ['vhdr']
  35. mode = 'file'
  36. def __init__(self, filename=None):
  37. """
  38. This class read/write a elan based file.
  39. **Arguments**
  40. filename : the filename to read or write
  41. """
  42. BaseIO.__init__(self)
  43. self.filename = filename
  44. def read_segment(self, lazy=False, cascade=True):
  45. # # Read header file (vhdr)
  46. header = read_brain_soup(self.filename)
  47. assert header['Common Infos'][
  48. 'DataFormat'] == 'BINARY', NotImplementedError
  49. assert header['Common Infos'][
  50. 'DataOrientation'] == 'MULTIPLEXED', NotImplementedError
  51. nb_channel = int(header['Common Infos']['NumberOfChannels'])
  52. sampling_rate = 1.e6 / float(
  53. header['Common Infos']['SamplingInterval']) * pq.Hz
  54. fmt = header['Binary Infos']['BinaryFormat']
  55. fmts = { 'INT_16':np.int16, 'INT_32':np.int32, 'IEEE_FLOAT_32':np.float32,}
  56. assert fmt in fmts, NotImplementedError
  57. dt = fmts[fmt]
  58. seg = Segment(file_origin=os.path.basename(self.filename))
  59. if not cascade:
  60. return seg
  61. # read binary
  62. if not lazy:
  63. binary_file = os.path.splitext(self.filename)[0] + '.eeg'
  64. sigs = np.memmap(binary_file, dt, 'r', ).astype('f')
  65. n = int(sigs.size / nb_channel)
  66. sigs = sigs[:n * nb_channel]
  67. sigs = sigs.reshape(n, nb_channel)
  68. for c in range(nb_channel):
  69. name, ref, res, units = header['Channel Infos'][
  70. 'Ch%d' % (c + 1,)].split(',')
  71. units = pq.Quantity(1, units.replace('µ', 'u'))
  72. if lazy:
  73. signal = [] * units
  74. else:
  75. signal = sigs[:,c]*units
  76. if dt == np.int16 or dt == np.int32:
  77. signal *= np.float(res)
  78. anasig = AnalogSignal(signal = signal,
  79. channel_index = c,
  80. name = name,
  81. sampling_rate = sampling_rate,
  82. )
  83. if lazy:
  84. anasig.lazy_shape = -1
  85. seg.analogsignals.append(anasig)
  86. # read marker
  87. marker_file = os.path.splitext(self.filename)[0] + '.vmrk'
  88. all_info = read_brain_soup(marker_file)['Marker Infos']
  89. all_types = []
  90. times = []
  91. labels = []
  92. for i in range(len(all_info)):
  93. type_, label, pos, size, channel = all_info[
  94. 'Mk%d' % (i + 1,)].split(',')[:5]
  95. all_types.append(type_)
  96. times.append(float(pos) / sampling_rate.magnitude)
  97. labels.append(label)
  98. all_types = np.array(all_types)
  99. times = np.array(times) * pq.s
  100. labels = np.array(labels, dtype='S')
  101. for type_ in np.unique(all_types):
  102. ind = type_ == all_types
  103. if lazy:
  104. ea = Event(name=str(type_))
  105. ea.lazy_shape = -1
  106. else:
  107. ea = Event(
  108. times=times[ind], labels=labels[ind], name=str(type_))
  109. seg.events.append(ea)
  110. seg.create_many_to_one_relationship()
  111. return seg
  112. def read_brain_soup(filename):
  113. section = None
  114. all_info = {}
  115. for line in open(filename, 'rU'):
  116. line = line.strip('\n').strip('\r')
  117. if line.startswith('['):
  118. section = re.findall('\[([\S ]+)\]', line)[0]
  119. all_info[section] = {}
  120. continue
  121. if line.startswith(';'):
  122. continue
  123. if '=' in line and len(line.split('=')) == 2:
  124. k, v = line.split('=')
  125. all_info[section][k] = v
  126. return all_info