elanio.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data from Elan.
  4. Elan is software for studying time-frequency maps of EEG data.
  5. Elan is developed in Lyon, France, at INSERM U821
  6. An Elan dataset is separated into 3 files :
  7. - .eeg raw data file
  8. - .eeg.ent hearder file
  9. - .eeg.pos event file
  10. Depend on:
  11. Supported : Read and Write
  12. Author: sgarcia
  13. """
  14. import datetime
  15. import os
  16. import re
  17. import numpy as np
  18. import quantities as pq
  19. from neo.io.baseio import BaseIO
  20. from neo.core import Segment, AnalogSignal, Event
  21. class VersionError(Exception):
  22. def __init__(self, value):
  23. self.value = value
  24. def __str__(self):
  25. return repr(self.value)
  26. class ElanIO(BaseIO):
  27. """
  28. Classe for reading/writing data from Elan.
  29. Usage:
  30. >>> from neo import io
  31. >>> r = io.ElanIO(filename='File_elan_1.eeg')
  32. >>> seg = r.read_segment(lazy = False, cascade = True,)
  33. >>> print seg.analogsignals # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  34. [<AnalogSignal(array([ 89.21203613, 88.83666992, 87.21008301, ...,
  35. 64.56298828, 67.94128418, 68.44177246], dtype=float32) * pA,
  36. [0.0 s, 101.5808 s], sampling rate: 10000.0 Hz)>]
  37. >>> print seg.spiketrains # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  38. []
  39. >>> print seg.events # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  40. []
  41. """
  42. is_readable = True
  43. is_writable = False
  44. supported_objects = [Segment, AnalogSignal, Event]
  45. readable_objects = [Segment]
  46. writeable_objects = []
  47. has_header = False
  48. is_streameable = False
  49. read_params = {Segment: []}
  50. write_params = {Segment: []}
  51. name = None
  52. extensions = ['eeg']
  53. mode = 'file'
  54. def __init__(self, filename=None):
  55. """
  56. This class read/write a elan based file.
  57. **Arguments**
  58. filename : the filename to read or write
  59. """
  60. BaseIO.__init__(self)
  61. self.filename = filename
  62. def read_segment(self, lazy=False, cascade=True):
  63. # # Read header file
  64. f = open(self.filename + '.ent', 'rU')
  65. #version
  66. version = f.readline()
  67. if version[:2] != 'V2' and version[:2] != 'V3':
  68. # raise('read only V2 .eeg.ent files')
  69. raise VersionError('Read only V2 or V3 .eeg.ent files. %s given' %
  70. version[:2])
  71. #info
  72. info1 = f.readline()[:-1]
  73. info2 = f.readline()[:-1]
  74. # strange 2 line for datetime
  75. #line1
  76. l = f.readline()
  77. r1 = re.findall('(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  78. r2 = re.findall('(\d+):(\d+):(\d+)', l)
  79. r3 = re.findall('(\d+)-(\d+)-(\d+)', l)
  80. YY, MM, DD, hh, mm, ss = (None, ) * 6
  81. if len(r1) != 0:
  82. DD, MM, YY, hh, mm, ss = r1[0]
  83. elif len(r2) != 0:
  84. hh, mm, ss = r2[0]
  85. elif len(r3) != 0:
  86. DD, MM, YY = r3[0]
  87. #line2
  88. l = f.readline()
  89. r1 = re.findall('(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)', l)
  90. r2 = re.findall('(\d+):(\d+):(\d+)', l)
  91. r3 = re.findall('(\d+)-(\d+)-(\d+)', l)
  92. if len(r1) != 0:
  93. DD, MM, YY, hh, mm, ss = r1[0]
  94. elif len(r2) != 0:
  95. hh, mm, ss = r2[0]
  96. elif len(r3) != 0:
  97. DD, MM, YY = r3[0]
  98. try:
  99. fulldatetime = datetime.datetime(int(YY), int(MM), int(DD),
  100. int(hh), int(mm), int(ss))
  101. except:
  102. fulldatetime = None
  103. seg = Segment(file_origin=os.path.basename(self.filename),
  104. elan_version=version,
  105. info1=info1,
  106. info2=info2,
  107. rec_datetime=fulldatetime)
  108. if not cascade:
  109. return seg
  110. l = f.readline()
  111. l = f.readline()
  112. l = f.readline()
  113. # sampling rate sample
  114. l = f.readline()
  115. sampling_rate = 1. / float(l) * pq.Hz
  116. # nb channel
  117. l = f.readline()
  118. nbchannel = int(l) - 2
  119. #channel label
  120. labels = []
  121. for c in range(nbchannel + 2):
  122. labels.append(f.readline()[:-1])
  123. # channel type
  124. types = []
  125. for c in range(nbchannel + 2):
  126. types.append(f.readline()[:-1])
  127. # channel unit
  128. units = []
  129. for c in range(nbchannel + 2):
  130. units.append(f.readline()[:-1])
  131. #print units
  132. #range
  133. min_physic = []
  134. for c in range(nbchannel + 2):
  135. min_physic.append(float(f.readline()))
  136. max_physic = []
  137. for c in range(nbchannel + 2):
  138. max_physic.append(float(f.readline()))
  139. min_logic = []
  140. for c in range(nbchannel + 2):
  141. min_logic.append(float(f.readline()))
  142. max_logic = []
  143. for c in range(nbchannel + 2):
  144. max_logic.append(float(f.readline()))
  145. #info filter
  146. info_filter = []
  147. for c in range(nbchannel + 2):
  148. info_filter.append(f.readline()[:-1])
  149. f.close()
  150. #raw data
  151. n = int(round(np.log(max_logic[0] - min_logic[0]) / np.log(2)) / 8)
  152. data = np.fromfile(self.filename, dtype='i' + str(n))
  153. data = data.byteswap().reshape(
  154. (data.size / (nbchannel + 2), nbchannel + 2)).astype('f4')
  155. for c in range(nbchannel):
  156. if lazy:
  157. sig = []
  158. else:
  159. sig = (data[:, c] - min_logic[c]) / (
  160. max_logic[c] - min_logic[c]) * \
  161. (max_physic[c] - min_physic[c]) + min_physic[c]
  162. try:
  163. unit = pq.Quantity(1, units[c])
  164. except:
  165. unit = pq.Quantity(1, '')
  166. ana_sig = AnalogSignal(
  167. sig * unit, sampling_rate=sampling_rate,
  168. t_start=0. * pq.s, name=labels[c], channel_index=c)
  169. if lazy:
  170. ana_sig.lazy_shape = data.shape[0]
  171. ana_sig.annotate(channel_name=labels[c])
  172. seg.analogsignals.append(ana_sig)
  173. # triggers
  174. f = open(self.filename + '.pos')
  175. times = []
  176. labels = []
  177. reject_codes = []
  178. for l in f.readlines():
  179. r = re.findall(' *(\d+) *(\d+) *(\d+) *', l)
  180. times.append(float(r[0][0]) / sampling_rate.magnitude)
  181. labels.append(str(r[0][1]))
  182. reject_codes.append(str(r[0][2]))
  183. if lazy:
  184. times = [] * pq.S
  185. labels = np.array([], dtype='S')
  186. reject_codes = []
  187. else:
  188. times = np.array(times) * pq.s
  189. labels = np.array(labels)
  190. reject_codes = np.array(reject_codes)
  191. ea = Event(times=times, labels=labels, reject_codes=reject_codes)
  192. if lazy:
  193. ea.lazy_shape = len(times)
  194. seg.events.append(ea)
  195. f.close()
  196. seg.create_many_to_one_relationship()
  197. return seg
  198. #~ def write_segment(self, segment, ):
  199. #~ """
  200. #~ Arguments:
  201. #~ segment : the segment to write. Only analog signals and events
  202. #~ will be written.
  203. #~ """
  204. #~ assert self.filename.endswith('.eeg')
  205. #~ fid_ent = open(self.filename+'.ent' ,'wt')
  206. #~ fid_eeg = open(self.filename ,'wt')
  207. #~ fid_pos = open(self.filename+'.pos' ,'wt')
  208. #~ seg = segment
  209. #~ sampling_rate = seg._analogsignals[0].sampling_rate
  210. #~ N = len(seg._analogsignals)
  211. #~ #
  212. #~ # header file
  213. #~ #
  214. #~ fid_ent.write('V2\n')
  215. #~ fid_ent.write('OpenElectrophyImport\n')
  216. #~ fid_ent.write('ELAN\n')
  217. #~ t = datetime.datetime.now()
  218. #~ fid_ent.write(t.strftime('%d-%m-%Y %H:%M:%S')+'\n')
  219. #~ fid_ent.write(t.strftime('%d-%m-%Y %H:%M:%S')+'\n')
  220. #~ fid_ent.write('-1\n')
  221. #~ fid_ent.write('reserved\n')
  222. #~ fid_ent.write('-1\n')
  223. #~ fid_ent.write('%g\n' % (1./sampling_rate))
  224. #~ fid_ent.write( '%d\n' % (N+2) )
  225. #~ # channel label
  226. #~ for i, anaSig in enumerate(seg.analogsignals) :
  227. #~ try :
  228. #~ fid_ent.write('%s.%d\n' % (anaSig.label, i+1 ))
  229. #~ except :
  230. #~ fid_ent.write('%s.%d\n' % ('nolabel', i+1 ))
  231. #~ fid_ent.write('Num1\n')
  232. #~ fid_ent.write('Num2\n')
  233. #~ #channel type
  234. #~ for i, anaSig in enumerate(seg.analogsignals) :
  235. #~ fid_ent.write('Electrode\n')
  236. #~ fid_ent.write( 'dateur echantillon\n')
  237. #~ fid_ent.write( 'type evenement et byte info\n')
  238. #~ #units
  239. #~ for i, anaSig in enumerate(seg._analogsignals) :
  240. #~ unit_txt = str(anaSig.units).split(' ')[1]
  241. #~ fid_ent.write('%s\n' % unit_txt)
  242. #~ fid_ent.write('sans\n')
  243. #~ fid_ent.write('sans\n')
  244. #~ #range and data
  245. #~ list_range = []
  246. #~ data = np.zeros( (seg._analogsignals[0].size , N+2) , 'i2')
  247. #~ for i, anaSig in enumerate(seg._analogsignals) :
  248. #~ # in elan file unit is supposed to be in microV to have a big range
  249. #~ # so auto translate
  250. #~ if anaSig.units == pq.V or anaSig.units == pq.mV:
  251. #~ s = anaSig.rescale('uV').magnitude
  252. #~ elif anaSig.units == pq.uV:
  253. #~ s = anaSig.magnitude
  254. #~ else:
  255. #~ # automatic range in arbitrry unit
  256. #~ s = anaSig.magnitude
  257. #~ s*= 10**(int(np.log10(abs(s).max()))+1)
  258. #~ list_range.append( int(abs(s).max()) +1 )
  259. #~ s2 = s*65535/(2*list_range[i])
  260. #~ data[:,i] = s2.astype('i2')
  261. #~ for r in list_range :
  262. #~ fid_ent.write('-%.0f\n'% r)
  263. #~ fid_ent.write('-1\n')
  264. #~ fid_ent.write('-1\n')
  265. #~ for r in list_range :
  266. #~ fid_ent.write('%.0f\n'% r)
  267. #~ fid_ent.write('+1\n')
  268. #~ fid_ent.write('+1\n')
  269. #~ for i in range(N+2) :
  270. #~ fid_ent.write('-32768\n')
  271. #~ for i in range(N+2) :
  272. #~ fid_ent.write('+32767\n')
  273. #~ #info filter
  274. #~ for i in range(N+2) :
  275. #~ fid_ent.write('passe-haut ? Hz passe-bas ? Hz\n')
  276. #~ fid_ent.write('sans\n')
  277. #~ fid_ent.write('sans\n')
  278. #~ for i in range(N+2) :
  279. #~ fid_ent.write('1\n')
  280. #~ for i in range(N+2) :
  281. #~ fid_ent.write('reserved\n')
  282. #~ # raw file .eeg
  283. #~ if len(seg._eventarrays) == 1:
  284. #~ ea = seg._eventarrays[0]
  285. #~ trigs = (ea.times*sampling_rate).magnitude
  286. #~ trigs = trigs.astype('i')
  287. #~ trigs2 = trigs[ (trigs>0) & (trigs<data.shape[0]) ]
  288. #~ data[trigs2,-1] = 1
  289. #~ fid_eeg.write(data.byteswap().tostring())
  290. #~ # pos file eeg.pos
  291. #~ if len(seg._eventarrays) == 1:
  292. #~ ea = seg._eventarray[0]
  293. #~ if 'reject_codes' in ea.annotations and \
  294. #~ len(ea.reject_codes) == len(ea.times):
  295. #~ rcs = ea.reject_codes
  296. #~ else:
  297. #~ rcs = np.array( [ '' ]*ea.times.size)
  298. #~ if len(ea.labels) == len(ea.times):
  299. #~ labels = ea.labels
  300. #~ else:
  301. #~ labels = np.array( [ '' ]*ea.times.size)
  302. #~ for t, label, rc in zip(ea.times, labels, rcs):
  303. #~ fid_pos.write('%d %s %s\n' % (trigs[i] , ev.label,0))
  304. #~ fid_ent.close()
  305. #~ fid_eeg.close()
  306. #~ fid_pos.close()