Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

winedrio.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # -*- coding: utf-8 -*-
  2. """
  3. Classe for reading data from WinEdr, a software tool written by
  4. John Dempster.
  5. WinEdr is free:
  6. http://spider.science.strath.ac.uk/sipbs/software.htm
  7. Depend on:
  8. Supported : Read
  9. Author: sgarcia
  10. """
  11. import os
  12. import struct
  13. import sys
  14. import numpy as np
  15. import quantities as pq
  16. from neo.io.baseio import BaseIO
  17. from neo.core import Segment, AnalogSignal
  18. PY3K = (sys.version_info[0] == 3)
  19. class WinEdrIO(BaseIO):
  20. """
  21. Class for reading data from WinEDR.
  22. Usage:
  23. >>> from neo import io
  24. >>> r = io.WinEdrIO(filename='File_WinEDR_1.EDR')
  25. >>> seg = r.read_segment(lazy=False, cascade=True,)
  26. >>> print seg.analogsignals
  27. [<AnalogSignal(array([ 89.21203613, 88.83666992, 87.21008301, ..., 64.56298828,
  28. 67.94128418, 68.44177246], dtype=float32) * pA, [0.0 s, 101.5808 s], sampling rate: 10000.0 Hz)>]
  29. """
  30. is_readable = True
  31. is_writable = False
  32. supported_objects = [ Segment , AnalogSignal ]
  33. readable_objects = [Segment]
  34. writeable_objects = []
  35. has_header = False
  36. is_streameable = False
  37. read_params = { Segment : [ ], }
  38. write_params = None
  39. name = 'WinEDR'
  40. extensions = [ 'EDR' ]
  41. mode = 'file'
  42. def __init__(self , filename = None) :
  43. """
  44. This class read a WinEDR file.
  45. Arguments:
  46. filename : the filename
  47. """
  48. BaseIO.__init__(self)
  49. self.filename = filename
  50. def read_segment(self , lazy = False, cascade = True):
  51. seg = Segment(
  52. file_origin = os.path.basename(self.filename),
  53. )
  54. if not cascade:
  55. return seg
  56. fid = open(self.filename , 'rb')
  57. headertext = fid.read(2048)
  58. if PY3K:
  59. headertext = headertext.decode('ascii')
  60. header = {}
  61. for line in headertext.split('\r\n'):
  62. if '=' not in line : continue
  63. #print '#' , line , '#'
  64. key,val = line.split('=')
  65. if key in ['NC', 'NR','NBH','NBA','NBD','ADCMAX','NP','NZ','ADCMAX' ] :
  66. val = int(val)
  67. elif key in ['AD', 'DT', ] :
  68. val = val.replace(',','.')
  69. val = float(val)
  70. header[key] = val
  71. if not lazy:
  72. data = np.memmap(self.filename , np.dtype('i2') , 'r',
  73. #shape = (header['NC'], header['NP']) ,
  74. shape = (header['NP']/header['NC'],header['NC'], ) ,
  75. offset = header['NBH'])
  76. for c in range(header['NC']):
  77. YCF = float(header['YCF%d'%c].replace(',','.'))
  78. YAG = float(header['YAG%d'%c].replace(',','.'))
  79. YZ = float(header['YZ%d'%c].replace(',','.'))
  80. ADCMAX = header['ADCMAX']
  81. AD = header['AD']
  82. DT = header['DT']
  83. if 'TU' in header:
  84. if header['TU'] == 'ms':
  85. DT *= .001
  86. unit = header['YU%d'%c]
  87. try :
  88. unit = pq.Quantity(1., unit)
  89. except:
  90. unit = pq.Quantity(1., '')
  91. if lazy:
  92. signal = [ ] * unit
  93. else:
  94. signal = (data[:,header['YO%d'%c]].astype('f4')-YZ) *AD/( YCF*YAG*(ADCMAX+1)) * unit
  95. ana = AnalogSignal(signal,
  96. sampling_rate=pq.Hz / DT,
  97. t_start=0. * pq.s,
  98. name=header['YN%d' % c],
  99. channel_index=c)
  100. if lazy:
  101. ana.lazy_shape = header['NP']/header['NC']
  102. seg.analogsignals.append(ana)
  103. seg.create_many_to_one_relationship()
  104. return seg
  105. AnalysisDescription = [
  106. ('RecordStatus','8s'),
  107. ('RecordType','4s'),
  108. ('GroupNumber','f'),
  109. ('TimeRecorded','f'),
  110. ('SamplingInterval','f'),
  111. ('VMax','8f'),
  112. ]
  113. class HeaderReader():
  114. def __init__(self,fid ,description ):
  115. self.fid = fid
  116. self.description = description
  117. def read_f(self, offset =0):
  118. self.fid.seek(offset)
  119. d = { }
  120. for key, fmt in self.description :
  121. val = struct.unpack(fmt , self.fid.read(struct.calcsize(fmt)))
  122. if len(val) == 1:
  123. val = val[0]
  124. else :
  125. val = list(val)
  126. d[key] = val
  127. return d