winedrio.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # -*- coding: utf-8 -*-
  2. """
  3. Classe for reading data from WinEdr, a software tool written by
  4. John Dempster.
  5. WinEdr is free:
  6. http://spider.science.strath.ac.uk/sipbs/software.htm
  7. Depend on:
  8. Supported : Read
  9. Author: sgarcia
  10. """
  11. import os
  12. import struct
  13. import sys
  14. import numpy as np
  15. import quantities as pq
  16. from neo.io.baseio import BaseIO
  17. from neo.core import Segment, AnalogSignal
  18. PY3K = (sys.version_info[0] == 3)
  19. class WinEdrIO(BaseIO):
  20. """
  21. Class for reading data from WinEDR.
  22. Usage:
  23. >>> from neo import io
  24. >>> r = io.WinEdrIO(filename='File_WinEDR_1.EDR')
  25. >>> seg = r.read_segment(lazy=False, cascade=True,)
  26. >>> print seg.analogsignals
  27. [<AnalogSignal(array([ 89.21203613, 88.83666992, 87.21008301, ..., 64.56298828,
  28. 67.94128418, 68.44177246], dtype=float32) * pA, [0.0 s, 101.5808 s], sampling rate: 10000.0 Hz)>]
  29. """
  30. is_readable = True
  31. is_writable = False
  32. supported_objects = [ Segment , AnalogSignal ]
  33. readable_objects = [Segment]
  34. writeable_objects = []
  35. has_header = False
  36. is_streameable = False
  37. read_params = { Segment : [ ], }
  38. write_params = None
  39. name = 'WinEDR'
  40. extensions = [ 'EDR' ]
  41. mode = 'file'
  42. def __init__(self , filename = None) :
  43. """
  44. This class read a WinEDR file.
  45. Arguments:
  46. filename : the filename
  47. """
  48. BaseIO.__init__(self)
  49. self.filename = filename
  50. def read_segment(self , lazy = False, cascade = True):
  51. seg = Segment(
  52. file_origin = os.path.basename(self.filename),
  53. )
  54. if not cascade:
  55. return seg
  56. fid = open(self.filename , 'rb')
  57. headertext = fid.read(2048)
  58. if PY3K:
  59. headertext = headertext.decode('ascii')
  60. header = {}
  61. for line in headertext.split('\r\n'):
  62. if '=' not in line : continue
  63. #print '#' , line , '#'
  64. key,val = line.split('=')
  65. if key in ['NC', 'NR','NBH','NBA','NBD','ADCMAX','NP','NZ','ADCMAX' ] :
  66. val = int(val)
  67. elif key in ['AD', 'DT', ] :
  68. val = val.replace(',','.')
  69. val = float(val)
  70. header[key] = val
  71. if not lazy:
  72. data = np.memmap(self.filename , np.dtype('i2') , 'r',
  73. #shape = (header['NC'], header['NP']) ,
  74. shape = (header['NP']//header['NC'],header['NC'], ) ,
  75. offset = header['NBH'])
  76. for c in range(header['NC']):
  77. YCF = float(header['YCF%d'%c].replace(',','.'))
  78. YAG = float(header['YAG%d'%c].replace(',','.'))
  79. YZ = float(header['YZ%d'%c].replace(',','.'))
  80. ADCMAX = header['ADCMAX']
  81. AD = header['AD']
  82. DT = header['DT']
  83. if 'TU' in header:
  84. if header['TU'] == 'ms':
  85. DT *= .001
  86. unit = header['YU%d'%c]
  87. try :
  88. unit = pq.Quantity(1., unit)
  89. except:
  90. unit = pq.Quantity(1., '')
  91. if lazy:
  92. signal = [ ] * unit
  93. else:
  94. chan = int(header['YO%d'%c])
  95. signal = (data[:,chan].astype('float32')-YZ) *AD/( YCF*YAG*(ADCMAX+1)) * unit
  96. ana = AnalogSignal(signal,
  97. sampling_rate=pq.Hz / DT,
  98. t_start=0. * pq.s,
  99. name=header['YN%d' % c],
  100. channel_index=c)
  101. if lazy:
  102. ana.lazy_shape = header['NP']/header['NC']
  103. seg.analogsignals.append(ana)
  104. seg.create_many_to_one_relationship()
  105. fid.close()
  106. return seg
  107. AnalysisDescription = [
  108. ('RecordStatus','8s'),
  109. ('RecordType','4s'),
  110. ('GroupNumber','f'),
  111. ('TimeRecorded','f'),
  112. ('SamplingInterval','f'),
  113. ('VMax','8f'),
  114. ]
  115. class HeaderReader():
  116. def __init__(self,fid ,description ):
  117. self.fid = fid
  118. self.description = description
  119. def read_f(self, offset =0):
  120. self.fid.seek(offset)
  121. d = { }
  122. for key, fmt in self.description :
  123. val = struct.unpack(fmt , self.fid.read(struct.calcsize(fmt)))
  124. if len(val) == 1:
  125. val = val[0]
  126. else :
  127. val = list(val)
  128. d[key] = val
  129. return d