elanio.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data from Elan.
  4. Elan is software for studying time-frequency maps of EEG data.
  5. Elan is developed in Lyon, France, at INSERM U821
  6. An Elan dataset is separated into 3 files :
  7. - .eeg raw data file
  8. - .eeg.ent hearder file
  9. - .eeg.pos event file
  10. Depend on:
  11. Supported : Read and Write
  12. Author: sgarcia
  13. """
  14. import datetime
  15. import os
  16. import re
  17. import numpy as np
  18. import quantities as pq
  19. from neo.io.baseio import BaseIO
  20. from neo.core import Segment, AnalogSignal, Event
  21. class VersionError(Exception):
  22. def __init__(self, value):
  23. self.value = value
  24. def __str__(self):
  25. return repr(self.value)
  26. import io
  27. class ElanIO(BaseIO):
  28. """
  29. Classe for reading/writing data from Elan.
  30. Usage:
  31. >>> from neo import io
  32. >>> r = io.ElanIO(filename='File_elan_1.eeg')
  33. >>> seg = r.read_segment(lazy = False, cascade = True,)
  34. >>> print seg.analogsignals # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  35. [<AnalogSignal(array([ 89.21203613, 88.83666992, 87.21008301, ...,
  36. 64.56298828, 67.94128418, 68.44177246], dtype=float32) * pA,
  37. [0.0 s, 101.5808 s], sampling rate: 10000.0 Hz)>]
  38. >>> print seg.spiketrains # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  39. []
  40. >>> print seg.events # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  41. []
  42. """
  43. is_readable = True
  44. is_writable = False
  45. supported_objects = [Segment, AnalogSignal, Event]
  46. readable_objects = [Segment]
  47. writeable_objects = []
  48. has_header = False
  49. is_streameable = False
  50. read_params = {Segment: []}
  51. write_params = {Segment: []}
  52. name = None
  53. extensions = ['eeg']
  54. mode = 'file'
  55. def __init__(self, filename=None):
  56. """
  57. This class read/write a elan based file.
  58. **Arguments**
  59. filename : the filename to read or write
  60. """
  61. BaseIO.__init__(self)
  62. self.filename = filename
  63. def read_segment(self, lazy=False, cascade=True):
  64. # # Read header file
  65. f = io.open(self.filename + '.ent', mode='rt', encoding='ascii')
  66. #version
  67. version = f.readline()
  68. if version[:2] != 'V2' and version[:2] != 'V3':
  69. # raise('read only V2 .eeg.ent files')
  70. raise VersionError('Read only V2 or V3 .eeg.ent files. %s given' %
  71. version[:2])
  72. #info
  73. info1 = f.readline()[:-1]
  74. info2 = f.readline()[:-1]
  75. # strange 2 line for datetime
  76. #line1
  77. l = f.readline()
  78. r1 = re.findall('(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  79. r2 = re.findall('(\d+):(\d+):(\d+)', l)
  80. r3 = re.findall('(\d+)-(\d+)-(\d+)', l)
  81. YY, MM, DD, hh, mm, ss = (None, ) * 6
  82. if len(r1) != 0:
  83. DD, MM, YY, hh, mm, ss = r1[0]
  84. elif len(r2) != 0:
  85. hh, mm, ss = r2[0]
  86. elif len(r3) != 0:
  87. DD, MM, YY = r3[0]
  88. #line2
  89. l = f.readline()
  90. r1 = re.findall('(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  91. r2 = re.findall('(\d+):(\d+):(\d+)', l)
  92. r3 = re.findall('(\d+)-(\d+)-(\d+)', l)
  93. if len(r1) != 0:
  94. DD, MM, YY, hh, mm, ss = r1[0]
  95. elif len(r2) != 0:
  96. hh, mm, ss = r2[0]
  97. elif len(r3) != 0:
  98. DD, MM, YY = r3[0]
  99. try:
  100. fulldatetime = datetime.datetime(int(YY), int(MM), int(DD),
  101. int(hh), int(mm), int(ss))
  102. except:
  103. fulldatetime = None
  104. seg = Segment(file_origin=os.path.basename(self.filename),
  105. elan_version=version,
  106. info1=info1,
  107. info2=info2,
  108. rec_datetime=fulldatetime)
  109. if not cascade:
  110. f.close()
  111. return seg
  112. l = f.readline()
  113. l = f.readline()
  114. l = f.readline()
  115. # sampling rate sample
  116. l = f.readline()
  117. sampling_rate = 1. / float(l) * pq.Hz
  118. # nb channel
  119. l = f.readline()
  120. nbchannel = int(l) - 2
  121. #channel label
  122. labels = []
  123. for c in range(nbchannel + 2):
  124. labels.append(f.readline()[:-1])
  125. # channel type
  126. types = []
  127. for c in range(nbchannel + 2):
  128. types.append(f.readline()[:-1])
  129. # channel unit
  130. units = []
  131. for c in range(nbchannel + 2):
  132. units.append(f.readline()[:-1])
  133. #print units
  134. #range
  135. min_physic = []
  136. for c in range(nbchannel + 2):
  137. min_physic.append(float(f.readline()))
  138. max_physic = []
  139. for c in range(nbchannel + 2):
  140. max_physic.append(float(f.readline()))
  141. min_logic = []
  142. for c in range(nbchannel + 2):
  143. min_logic.append(float(f.readline()))
  144. max_logic = []
  145. for c in range(nbchannel + 2):
  146. max_logic.append(float(f.readline()))
  147. #info filter
  148. info_filter = []
  149. for c in range(nbchannel + 2):
  150. info_filter.append(f.readline()[:-1])
  151. f.close()
  152. #raw data
  153. n = int(round(np.log(max_logic[0] - min_logic[0]) / np.log(2)) / 8)
  154. data = np.fromfile(self.filename, dtype='i' + str(n))
  155. data = data.byteswap().reshape(
  156. (data.size // (nbchannel + 2), nbchannel + 2)).astype('float32')
  157. for c in range(nbchannel):
  158. if lazy:
  159. sig = []
  160. else:
  161. sig = (data[:, c] - min_logic[c]) / (
  162. max_logic[c] - min_logic[c]) * \
  163. (max_physic[c] - min_physic[c]) + min_physic[c]
  164. try:
  165. unit = pq.Quantity(1, units[c])
  166. except:
  167. unit = pq.Quantity(1, '')
  168. ana_sig = AnalogSignal(
  169. sig * unit, sampling_rate=sampling_rate,
  170. t_start=0. * pq.s, name=str(labels[c]), channel_index=c)
  171. if lazy:
  172. ana_sig.lazy_shape = data.shape[0]
  173. ana_sig.annotate(channel_name=labels[c])
  174. seg.analogsignals.append(ana_sig)
  175. # triggers
  176. f = open(self.filename + '.pos')
  177. times = []
  178. labels = []
  179. reject_codes = []
  180. for l in f.readlines():
  181. r = re.findall(' *(\d+) *(\d+) *(\d+) *', l)
  182. times.append(float(r[0][0]) / sampling_rate.magnitude)
  183. labels.append(str(r[0][1]))
  184. reject_codes.append(str(r[0][2]))
  185. if lazy:
  186. times = [] * pq.S
  187. labels = np.array([], dtype='S')
  188. reject_codes = []
  189. else:
  190. times = np.array(times) * pq.s
  191. labels = np.array(labels, dtype='S')
  192. reject_codes = np.array(reject_codes)
  193. ea = Event(times=times, labels=labels, reject_codes=reject_codes)
  194. if lazy:
  195. ea.lazy_shape = len(times)
  196. seg.events.append(ea)
  197. f.close()
  198. seg.create_many_to_one_relationship()
  199. return seg
  200. #~ def write_segment(self, segment, ):
  201. #~ """
  202. #~ Arguments:
  203. #~ segment : the segment to write. Only analog signals and events
  204. #~ will be written.
  205. #~ """
  206. #~ assert self.filename.endswith('.eeg')
  207. #~ fid_ent = open(self.filename+'.ent' ,'wt')
  208. #~ fid_eeg = open(self.filename ,'wt')
  209. #~ fid_pos = open(self.filename+'.pos' ,'wt')
  210. #~ seg = segment
  211. #~ sampling_rate = seg._analogsignals[0].sampling_rate
  212. #~ N = len(seg._analogsignals)
  213. #~ #
  214. #~ # header file
  215. #~ #
  216. #~ fid_ent.write('V2\n')
  217. #~ fid_ent.write('OpenElectrophyImport\n')
  218. #~ fid_ent.write('ELAN\n')
  219. #~ t = datetime.datetime.now()
  220. #~ fid_ent.write(t.strftime('%d-%m-%Y %H:%M:%S')+'\n')
  221. #~ fid_ent.write(t.strftime('%d-%m-%Y %H:%M:%S')+'\n')
  222. #~ fid_ent.write('-1\n')
  223. #~ fid_ent.write('reserved\n')
  224. #~ fid_ent.write('-1\n')
  225. #~ fid_ent.write('%g\n' % (1./sampling_rate))
  226. #~ fid_ent.write( '%d\n' % (N+2) )
  227. #~ # channel label
  228. #~ for i, anaSig in enumerate(seg.analogsignals) :
  229. #~ try :
  230. #~ fid_ent.write('%s.%d\n' % (anaSig.label, i+1 ))
  231. #~ except :
  232. #~ fid_ent.write('%s.%d\n' % ('nolabel', i+1 ))
  233. #~ fid_ent.write('Num1\n')
  234. #~ fid_ent.write('Num2\n')
  235. #~ #channel type
  236. #~ for i, anaSig in enumerate(seg.analogsignals) :
  237. #~ fid_ent.write('Electrode\n')
  238. #~ fid_ent.write( 'dateur echantillon\n')
  239. #~ fid_ent.write( 'type evenement et byte info\n')
  240. #~ #units
  241. #~ for i, anaSig in enumerate(seg._analogsignals) :
  242. #~ unit_txt = str(anaSig.units).split(' ')[1]
  243. #~ fid_ent.write('%s\n' % unit_txt)
  244. #~ fid_ent.write('sans\n')
  245. #~ fid_ent.write('sans\n')
  246. #~ #range and data
  247. #~ list_range = []
  248. #~ data = np.zeros( (seg._analogsignals[0].size , N+2) , 'i2')
  249. #~ for i, anaSig in enumerate(seg._analogsignals) :
  250. #~ # in elan file unit is supposed to be in microV to have a big range
  251. #~ # so auto translate
  252. #~ if anaSig.units == pq.V or anaSig.units == pq.mV:
  253. #~ s = anaSig.rescale('uV').magnitude
  254. #~ elif anaSig.units == pq.uV:
  255. #~ s = anaSig.magnitude
  256. #~ else:
  257. #~ # automatic range in arbitrry unit
  258. #~ s = anaSig.magnitude
  259. #~ s*= 10**(int(np.log10(abs(s).max()))+1)
  260. #~ list_range.append( int(abs(s).max()) +1 )
  261. #~ s2 = s*65535/(2*list_range[i])
  262. #~ data[:,i] = s2.astype('i2')
  263. #~ for r in list_range :
  264. #~ fid_ent.write('-%.0f\n'% r)
  265. #~ fid_ent.write('-1\n')
  266. #~ fid_ent.write('-1\n')
  267. #~ for r in list_range :
  268. #~ fid_ent.write('%.0f\n'% r)
  269. #~ fid_ent.write('+1\n')
  270. #~ fid_ent.write('+1\n')
  271. #~ for i in range(N+2) :
  272. #~ fid_ent.write('-32768\n')
  273. #~ for i in range(N+2) :
  274. #~ fid_ent.write('+32767\n')
  275. #~ #info filter
  276. #~ for i in range(N+2) :
  277. #~ fid_ent.write('passe-haut ? Hz passe-bas ? Hz\n')
  278. #~ fid_ent.write('sans\n')
  279. #~ fid_ent.write('sans\n')
  280. #~ for i in range(N+2) :
  281. #~ fid_ent.write('1\n')
  282. #~ for i in range(N+2) :
  283. #~ fid_ent.write('reserved\n')
  284. #~ # raw file .eeg
  285. #~ if len(seg._eventarrays) == 1:
  286. #~ ea = seg._eventarrays[0]
  287. #~ trigs = (ea.times*sampling_rate).magnitude
  288. #~ trigs = trigs.astype('i')
  289. #~ trigs2 = trigs[ (trigs>0) & (trigs<data.shape[0]) ]
  290. #~ data[trigs2,-1] = 1
  291. #~ fid_eeg.write(data.byteswap().tostring())
  292. #~ # pos file eeg.pos
  293. #~ if len(seg._eventarrays) == 1:
  294. #~ ea = seg._eventarray[0]
  295. #~ if 'reject_codes' in ea.annotations and \
  296. #~ len(ea.reject_codes) == len(ea.times):
  297. #~ rcs = ea.reject_codes
  298. #~ else:
  299. #~ rcs = np.array( [ '' ]*ea.times.size)
  300. #~ if len(ea.labels) == len(ea.times):
  301. #~ labels = ea.labels
  302. #~ else:
  303. #~ labels = np.array( [ '' ]*ea.times.size)
  304. #~ for t, label, rc in zip(ea.times, labels, rcs):
  305. #~ fid_pos.write('%d %s %s\n' % (trigs[i] , ev.label,0))
  306. #~ fid_ent.close()
  307. #~ fid_eeg.close()
  308. #~ fid_pos.close()