1
1

elanrawio.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from Elan.
  4. Elan is software for studying time-frequency maps of EEG data.
  5. Elan is developed in Lyon, France, at INSERM U821
  6. https://elan.lyon.inserm.fr
  7. An Elan dataset is separated into 3 files :
  8. - .eeg raw data file
  9. - .eeg.ent hearder file
  10. - .eeg.pos event file
  11. Author: Samuel Garcia
  12. """
  13. from __future__ import unicode_literals, print_function, division, absolute_import
  14. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  15. _event_channel_dtype)
  16. import numpy as np
  17. import datetime
  18. import os
  19. import re
  20. import io
  21. class ElanRawIO(BaseRawIO):
  22. extensions = ['eeg']
  23. rawmode = 'one-file'
  24. def __init__(self, filename=''):
  25. BaseRawIO.__init__(self)
  26. self.filename = filename
  27. def _parse_header(self):
  28. with io.open(self.filename + '.ent', mode='rt', encoding='ascii', newline=None) as f:
  29. # version
  30. version = f.readline()[:-1]
  31. assert version in ['V2', 'V3'], 'Read only V2 or V3 .eeg.ent files. %s given' % version
  32. # info
  33. info1 = f.readline()[:-1]
  34. info2 = f.readline()[:-1]
  35. # strange 2 line for datetime
  36. # line1
  37. l = f.readline()
  38. r1 = re.findall(r'(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  39. r2 = re.findall(r'(\d+):(\d+):(\d+)', l)
  40. r3 = re.findall(r'(\d+)-(\d+)-(\d+)', l)
  41. YY, MM, DD, hh, mm, ss = (None,) * 6
  42. if len(r1) != 0:
  43. DD, MM, YY, hh, mm, ss = r1[0]
  44. elif len(r2) != 0:
  45. hh, mm, ss = r2[0]
  46. elif len(r3) != 0:
  47. DD, MM, YY = r3[0]
  48. # line2
  49. l = f.readline()
  50. r1 = re.findall(r'(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  51. r2 = re.findall(r'(\d+):(\d+):(\d+)', l)
  52. r3 = re.findall(r'(\d+)-(\d+)-(\d+)', l)
  53. if len(r1) != 0:
  54. DD, MM, YY, hh, mm, ss = r1[0]
  55. elif len(r2) != 0:
  56. hh, mm, ss = r2[0]
  57. elif len(r3) != 0:
  58. DD, MM, YY = r3[0]
  59. try:
  60. fulldatetime = datetime.datetime(int(YY), int(MM), int(DD),
  61. int(hh), int(mm), int(ss))
  62. except:
  63. fulldatetime = None
  64. l = f.readline()
  65. l = f.readline()
  66. l = f.readline()
  67. # sampling rate sample
  68. l = f.readline()
  69. self._sampling_rate = 1. / float(l)
  70. # nb channel
  71. l = f.readline()
  72. nb_channel = int(l) - 2
  73. channel_infos = [{} for c in range(nb_channel + 2)]
  74. # channel label
  75. for c in range(nb_channel + 2):
  76. channel_infos[c]['label'] = f.readline()[:-1]
  77. # channel kind
  78. for c in range(nb_channel + 2):
  79. channel_infos[c]['kind'] = f.readline()[:-1]
  80. # channel unit
  81. for c in range(nb_channel + 2):
  82. channel_infos[c]['units'] = f.readline()[:-1]
  83. # range for gain and offset
  84. for c in range(nb_channel + 2):
  85. channel_infos[c]['min_physic'] = float(f.readline()[:-1])
  86. for c in range(nb_channel + 2):
  87. channel_infos[c]['max_physic'] = float(f.readline()[:-1])
  88. for c in range(nb_channel + 2):
  89. channel_infos[c]['min_logic'] = float(f.readline()[:-1])
  90. for c in range(nb_channel + 2):
  91. channel_infos[c]['max_logic'] = float(f.readline()[:-1])
  92. # info filter
  93. info_filter = []
  94. for c in range(nb_channel + 2):
  95. channel_infos[c]['info_filter'] = f.readline()[:-1]
  96. n = int(round(np.log(channel_infos[0]['max_logic'] -
  97. channel_infos[0]['min_logic']) / np.log(2)) / 8)
  98. sig_dtype = np.dtype('>i' + str(n))
  99. sig_channels = []
  100. for c, chan_info in enumerate(channel_infos[:-2]):
  101. chan_name = chan_info['label']
  102. chan_id = c
  103. gain = (chan_info['max_physic'] - chan_info['min_physic']) / \
  104. (chan_info['max_logic'] - chan_info['min_logic'])
  105. offset = - chan_info['min_logic'] * gain + chan_info['min_physic']
  106. gourp_id = 0
  107. sig_channels.append((chan_name, chan_id, self._sampling_rate, sig_dtype,
  108. chan_info['units'], gain, offset, gourp_id))
  109. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  110. # raw data
  111. self._raw_signals = np.memmap(self.filename, dtype=sig_dtype, mode='r',
  112. offset=0).reshape(-1, nb_channel + 2)
  113. self._raw_signals = self._raw_signals[:, :-2]
  114. # triggers
  115. with io.open(self.filename + '.pos', mode='rt', encoding='ascii', newline=None) as f:
  116. self._raw_event_timestamps = []
  117. self._event_labels = []
  118. self._reject_codes = []
  119. for l in f.readlines():
  120. r = re.findall(r' *(\d+) *(\d+) *(\d+) *', l)
  121. self._raw_event_timestamps.append(int(r[0][0]))
  122. self._event_labels.append(str(r[0][1]))
  123. self._reject_codes.append(str(r[0][2]))
  124. self._raw_event_timestamps = np.array(self._raw_event_timestamps, dtype='int64')
  125. self._event_labels = np.array(self._event_labels, dtype='U')
  126. self._reject_codes = np.array(self._reject_codes, dtype='U')
  127. event_channels = []
  128. event_channels.append(('Trigger', '', 'event'))
  129. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  130. # No spikes
  131. unit_channels = []
  132. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  133. # fille into header dict
  134. self.header = {}
  135. self.header['nb_block'] = 1
  136. self.header['nb_segment'] = [1]
  137. self.header['signal_channels'] = sig_channels
  138. self.header['unit_channels'] = unit_channels
  139. self.header['event_channels'] = event_channels
  140. # insert some annotation at some place
  141. self._generate_minimal_annotations()
  142. extra_info = dict(rec_datetime=fulldatetime, elan_version=version,
  143. info1=info1, info2=info2)
  144. for obj_name in ('blocks', 'segments'):
  145. self._raw_annotate(obj_name, **extra_info)
  146. for c in range(nb_channel):
  147. d = channel_infos[c]
  148. self._raw_annotate('signals', chan_index=c, info_filter=d['info_filter'])
  149. self._raw_annotate('signals', chan_index=c, kind=d['kind'])
  150. self._raw_annotate('events', chan_index=0, reject_codes=self._reject_codes)
  151. def _source_name(self):
  152. return self.filename
  153. def _segment_t_start(self, block_index, seg_index):
  154. return 0.
  155. def _segment_t_stop(self, block_index, seg_index):
  156. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  157. return t_stop
  158. def _get_signal_size(self, block_index, seg_index, channel_indexes=None):
  159. return self._raw_signals.shape[0]
  160. def _get_signal_t_start(self, block_index, seg_index, channel_indexes=None):
  161. return 0.
  162. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  163. if channel_indexes is None:
  164. channel_indexes = slice(None)
  165. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  166. return raw_signals
  167. def _spike_count(self, block_index, seg_index, unit_index):
  168. return 0
  169. def _event_count(self, block_index, seg_index, event_channel_index):
  170. return self._raw_event_timestamps.size
  171. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  172. timestamp = self._raw_event_timestamps
  173. labels = self._event_labels
  174. durations = None
  175. if t_start is not None:
  176. keep = timestamp >= int(t_start * self._sampling_rate)
  177. timestamp = timestamp[keep]
  178. labels = labels[keep]
  179. if t_stop is not None:
  180. keep = timestamp <= int(t_stop * self._sampling_rate)
  181. timestamp = timestamp[keep]
  182. labels = labels[keep]
  183. return timestamp, durations, labels
  184. def _rescale_event_timestamp(self, event_timestamps, dtype):
  185. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  186. return event_times