elanrawio.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. """
  2. Class for reading data from Elan.
  3. Elan is software for studying time-frequency maps of EEG data.
  4. Elan is developed in Lyon, France, at INSERM U821
  5. https://elan.lyon.inserm.fr
  6. An Elan dataset is separated into 3 files :
  7. - .eeg raw data file
  8. - .eeg.ent hearder file
  9. - .eeg.pos event file
  10. Author: Samuel Garcia
  11. """
  12. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  13. _event_channel_dtype)
  14. import numpy as np
  15. import datetime
  16. import os
  17. import re
  18. import io
  19. class ElanRawIO(BaseRawIO):
  20. extensions = ['eeg']
  21. rawmode = 'one-file'
  22. def __init__(self, filename=''):
  23. BaseRawIO.__init__(self)
  24. self.filename = filename
  25. def _parse_header(self):
  26. with open(self.filename + '.ent', mode='rt', encoding='ascii', newline=None) as f:
  27. # version
  28. version = f.readline()[:-1]
  29. assert version in ['V2', 'V3'], 'Read only V2 or V3 .eeg.ent files. %s given' % version
  30. # info
  31. info1 = f.readline()[:-1]
  32. info2 = f.readline()[:-1]
  33. # strange 2 line for datetime
  34. # line1
  35. l = f.readline()
  36. r1 = re.findall(r'(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  37. r2 = re.findall(r'(\d+):(\d+):(\d+)', l)
  38. r3 = re.findall(r'(\d+)-(\d+)-(\d+)', l)
  39. YY, MM, DD, hh, mm, ss = (None,) * 6
  40. if len(r1) != 0:
  41. DD, MM, YY, hh, mm, ss = r1[0]
  42. elif len(r2) != 0:
  43. hh, mm, ss = r2[0]
  44. elif len(r3) != 0:
  45. DD, MM, YY = r3[0]
  46. # line2
  47. l = f.readline()
  48. r1 = re.findall(r'(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  49. r2 = re.findall(r'(\d+):(\d+):(\d+)', l)
  50. r3 = re.findall(r'(\d+)-(\d+)-(\d+)', l)
  51. if len(r1) != 0:
  52. DD, MM, YY, hh, mm, ss = r1[0]
  53. elif len(r2) != 0:
  54. hh, mm, ss = r2[0]
  55. elif len(r3) != 0:
  56. DD, MM, YY = r3[0]
  57. try:
  58. fulldatetime = datetime.datetime(int(YY), int(MM), int(DD),
  59. int(hh), int(mm), int(ss))
  60. except:
  61. fulldatetime = None
  62. l = f.readline()
  63. l = f.readline()
  64. l = f.readline()
  65. # sampling rate sample
  66. l = f.readline()
  67. self._sampling_rate = 1. / float(l)
  68. # nb channel
  69. l = f.readline()
  70. nb_channel = int(l) - 2
  71. channel_infos = [{} for c in range(nb_channel + 2)]
  72. # channel label
  73. for c in range(nb_channel + 2):
  74. channel_infos[c]['label'] = f.readline()[:-1]
  75. # channel kind
  76. for c in range(nb_channel + 2):
  77. channel_infos[c]['kind'] = f.readline()[:-1]
  78. # channel unit
  79. for c in range(nb_channel + 2):
  80. channel_infos[c]['units'] = f.readline()[:-1]
  81. # range for gain and offset
  82. for c in range(nb_channel + 2):
  83. channel_infos[c]['min_physic'] = float(f.readline()[:-1])
  84. for c in range(nb_channel + 2):
  85. channel_infos[c]['max_physic'] = float(f.readline()[:-1])
  86. for c in range(nb_channel + 2):
  87. channel_infos[c]['min_logic'] = float(f.readline()[:-1])
  88. for c in range(nb_channel + 2):
  89. channel_infos[c]['max_logic'] = float(f.readline()[:-1])
  90. # info filter
  91. info_filter = []
  92. for c in range(nb_channel + 2):
  93. channel_infos[c]['info_filter'] = f.readline()[:-1]
  94. n = int(round(np.log(channel_infos[0]['max_logic'] -
  95. channel_infos[0]['min_logic']) / np.log(2)) / 8)
  96. sig_dtype = np.dtype('>i' + str(n))
  97. sig_channels = []
  98. for c, chan_info in enumerate(channel_infos[:-2]):
  99. chan_name = chan_info['label']
  100. chan_id = c
  101. gain = (chan_info['max_physic'] - chan_info['min_physic']) / \
  102. (chan_info['max_logic'] - chan_info['min_logic'])
  103. offset = - chan_info['min_logic'] * gain + chan_info['min_physic']
  104. gourp_id = 0
  105. sig_channels.append((chan_name, chan_id, self._sampling_rate, sig_dtype,
  106. chan_info['units'], gain, offset, gourp_id))
  107. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  108. # raw data
  109. self._raw_signals = np.memmap(self.filename, dtype=sig_dtype, mode='r',
  110. offset=0).reshape(-1, nb_channel + 2)
  111. self._raw_signals = self._raw_signals[:, :-2]
  112. # triggers
  113. with open(self.filename + '.pos', mode='rt', encoding='ascii', newline=None) as f:
  114. self._raw_event_timestamps = []
  115. self._event_labels = []
  116. self._reject_codes = []
  117. for l in f.readlines():
  118. r = re.findall(r' *(\d+) *(\d+) *(\d+) *', l)
  119. self._raw_event_timestamps.append(int(r[0][0]))
  120. self._event_labels.append(str(r[0][1]))
  121. self._reject_codes.append(str(r[0][2]))
  122. self._raw_event_timestamps = np.array(self._raw_event_timestamps, dtype='int64')
  123. self._event_labels = np.array(self._event_labels, dtype='U')
  124. self._reject_codes = np.array(self._reject_codes, dtype='U')
  125. event_channels = []
  126. event_channels.append(('Trigger', '', 'event'))
  127. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  128. # No spikes
  129. unit_channels = []
  130. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  131. # fille into header dict
  132. self.header = {}
  133. self.header['nb_block'] = 1
  134. self.header['nb_segment'] = [1]
  135. self.header['signal_channels'] = sig_channels
  136. self.header['unit_channels'] = unit_channels
  137. self.header['event_channels'] = event_channels
  138. # insert some annotation at some place
  139. self._generate_minimal_annotations()
  140. extra_info = dict(rec_datetime=fulldatetime, elan_version=version,
  141. info1=info1, info2=info2)
  142. for obj_name in ('blocks', 'segments'):
  143. self._raw_annotate(obj_name, **extra_info)
  144. for c in range(nb_channel):
  145. d = channel_infos[c]
  146. self._raw_annotate('signals', chan_index=c, info_filter=d['info_filter'])
  147. self._raw_annotate('signals', chan_index=c, kind=d['kind'])
  148. self._raw_annotate('events', chan_index=0, reject_codes=self._reject_codes)
  149. def _source_name(self):
  150. return self.filename
  151. def _segment_t_start(self, block_index, seg_index):
  152. return 0.
  153. def _segment_t_stop(self, block_index, seg_index):
  154. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  155. return t_stop
  156. def _get_signal_size(self, block_index, seg_index, channel_indexes=None):
  157. return self._raw_signals.shape[0]
  158. def _get_signal_t_start(self, block_index, seg_index, channel_indexes=None):
  159. return 0.
  160. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  161. if channel_indexes is None:
  162. channel_indexes = slice(None)
  163. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  164. return raw_signals
  165. def _spike_count(self, block_index, seg_index, unit_index):
  166. return 0
  167. def _event_count(self, block_index, seg_index, event_channel_index):
  168. return self._raw_event_timestamps.size
  169. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  170. timestamp = self._raw_event_timestamps
  171. labels = self._event_labels
  172. durations = None
  173. if t_start is not None:
  174. keep = timestamp >= int(t_start * self._sampling_rate)
  175. timestamp = timestamp[keep]
  176. labels = labels[keep]
  177. if t_stop is not None:
  178. keep = timestamp <= int(t_stop * self._sampling_rate)
  179. timestamp = timestamp[keep]
  180. labels = labels[keep]
  181. return timestamp, durations, labels
  182. def _rescale_event_timestamp(self, event_timestamps, dtype):
  183. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  184. return event_times