winwcpio.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data from WinWCP, a software tool written by
  4. John Dempster.
  5. WinWCP is free:
  6. http://spider.science.strath.ac.uk/sipbs/software.htm
  7. Supported : Read
  8. Author : sgarcia
  9. """
  10. import os
  11. import struct
  12. import sys
  13. import numpy as np
  14. import quantities as pq
  15. from neo.io.baseio import BaseIO
  16. from neo.core import Block, Segment, AnalogSignal
  17. PY3K = (sys.version_info[0] == 3)
  18. class WinWcpIO(BaseIO):
  19. """
  20. Class for reading from a WinWCP file.
  21. Usage:
  22. >>> from neo import io
  23. >>> r = io.WinWcpIO( filename = 'File_winwcp_1.wcp')
  24. >>> bl = r.read_block(lazy = False, cascade = True,)
  25. >>> print bl.segments # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  26. [<neo.core.segment.Segment object at 0x1057bd350>, <neo.core.segment.Segment object at 0x1057bd2d0>,
  27. ...
  28. >>> print bl.segments[0].analogsignals
  29. [<AnalogSignal(array([-2438.73388672, -2428.96801758, -2425.61083984, ..., -2695.39453125,
  30. ...
  31. """
  32. is_readable = True
  33. is_writable = False
  34. supported_objects = [Block, Segment , AnalogSignal ]
  35. readable_objects = [Block]
  36. writeable_objects = []
  37. has_header = False
  38. is_streameable = False
  39. read_params = { Block : [ ], }
  40. write_params = None
  41. name = 'WinWCP'
  42. extensions = [ 'wcp' ]
  43. mode = 'file'
  44. def __init__(self , filename = None) :
  45. """
  46. This class read a WinWCP wcp file.
  47. Arguments:
  48. filename : the filename to read
  49. """
  50. BaseIO.__init__(self)
  51. self.filename = filename
  52. def read_block(self , lazy = False,
  53. cascade = True,
  54. ):
  55. bl = Block( file_origin = os.path.basename(self.filename), )
  56. if not cascade:
  57. return bl
  58. fid = open(self.filename , 'rb')
  59. headertext = fid.read(1024)
  60. if PY3K:
  61. headertext = headertext.decode('ascii')
  62. header = {}
  63. for line in headertext.split('\r\n'):
  64. if '=' not in line : continue
  65. #print '#' , line , '#'
  66. key,val = line.split('=')
  67. if key in ['NC', 'NR','NBH','NBA','NBD','ADCMAX','NP','NZ', ] :
  68. val = int(val)
  69. elif key in ['AD', 'DT', ] :
  70. val = val.replace(',','.')
  71. val = float(val)
  72. header[key] = val
  73. #print header
  74. SECTORSIZE = 512
  75. # loop for record number
  76. for i in range(header['NR']):
  77. #print 'record ',i
  78. offset = 1024 + i*(SECTORSIZE*header['NBD']+1024)
  79. # read analysis zone
  80. analysisHeader = HeaderReader(fid , AnalysisDescription ).read_f(offset = offset)
  81. #print analysisHeader
  82. # read data
  83. NP = (SECTORSIZE*header['NBD'])/2
  84. NP = NP - NP%header['NC']
  85. NP = int(NP//header['NC'])
  86. if not lazy:
  87. data = np.memmap(self.filename , np.dtype('int16') , mode='r',
  88. shape=(NP,header['NC'], ) ,
  89. offset=offset+header['NBA']*SECTORSIZE)
  90. # create a segment
  91. seg = Segment()
  92. bl.segments.append(seg)
  93. for c in range(header['NC']):
  94. unit = header['YU%d'%c]
  95. try :
  96. unit = pq.Quantity(1., unit)
  97. except:
  98. unit = pq.Quantity(1., '')
  99. if lazy:
  100. signal = [ ] * unit
  101. else:
  102. YG = float(header['YG%d'%c].replace(',','.'))
  103. ADCMAX = header['ADCMAX']
  104. VMax = analysisHeader['VMax'][c]
  105. chan = int(header['YO%d'%c])
  106. signal = data[:,chan].astype('f4')*VMax/ADCMAX/YG * unit
  107. anaSig = AnalogSignal(signal,
  108. sampling_rate=
  109. pq.Hz /
  110. analysisHeader['SamplingInterval'] ,
  111. t_start=analysisHeader['TimeRecorded'] *
  112. pq.s,
  113. name=header['YN%d'%c], channel_index=c)
  114. if lazy:
  115. anaSig.lazy_shape = NP
  116. seg.analogsignals.append(anaSig)
  117. fid.close()
  118. bl.create_many_to_one_relationship()
  119. return bl
  120. AnalysisDescription = [
  121. ('RecordStatus','8s'),
  122. ('RecordType','4s'),
  123. ('GroupNumber','f'),
  124. ('TimeRecorded','f'),
  125. ('SamplingInterval','f'),
  126. ('VMax','8f'),
  127. ]
  128. class HeaderReader():
  129. def __init__(self,fid ,description ):
  130. self.fid = fid
  131. self.description = description
  132. def read_f(self, offset =0):
  133. self.fid.seek(offset)
  134. d = { }
  135. for key, fmt in self.description :
  136. val = struct.unpack(fmt , self.fid.read(struct.calcsize(fmt)))
  137. if len(val) == 1:
  138. val = val[0]
  139. else :
  140. val = list(val)
  141. d[key] = val
  142. return d