rawbinarysignalio.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data in a raw binary interleaved compact file.
  4. Sampling rate, units, number of channel and dtype must be externally known.
  5. This generic format is quite widely used in old acquisition systems and is quite universal
  6. for sharing data.
  7. Supported : Read/Write
  8. Author: sgarcia
  9. """
  10. import os
  11. import numpy as np
  12. import quantities as pq
  13. from neo.io.baseio import BaseIO
  14. from neo.core import Segment, AnalogSignal
  15. class RawBinarySignalIO(BaseIO):
  16. """
  17. Class for reading/writing data in a raw binary interleaved compact file.
  18. Usage:
  19. >>> from neo import io
  20. >>> r = io.RawBinarySignalIO( filename = 'File_ascii_signal_2.txt')
  21. >>> seg = r.read_segment(lazy = False, cascade = True,)
  22. >>> print seg.analogsignals # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  23. ...
  24. """
  25. is_readable = True
  26. is_writable = True
  27. supported_objects = [Segment , AnalogSignal]
  28. readable_objects = [ Segment]
  29. writeable_objects = [Segment]
  30. has_header = False
  31. is_streameable = False
  32. read_params = { Segment : [
  33. ('sampling_rate' , { 'value' : 1000. } ) ,
  34. ('nbchannel' , { 'value' : 16 } ),
  35. ('bytesoffset' , { 'value' : 0 } ),
  36. ('t_start' , { 'value' : 0. } ),
  37. ('dtype' , { 'value' : 'float32' , 'possible' : ['float32' , 'float64',
  38. 'int16' , 'uint16', 'int32' , 'uint32', ] } ),
  39. ('rangemin' , { 'value' : -10 } ),
  40. ('rangemax' , { 'value' : 10 } ),
  41. ]
  42. }
  43. write_params = { Segment : [
  44. ('bytesoffset' , { 'value' : 0 } ),
  45. ('dtype' , { 'value' : 'float32' , 'possible' : ['float32' , 'float64',
  46. 'int16' , 'uint16', 'int32' , 'uint32', ] } ),
  47. ('rangemin' , { 'value' : -10 } ),
  48. ('rangemax' , { 'value' : 10 } ),
  49. ]
  50. }
  51. name = None
  52. extensions = [ 'raw' ]
  53. mode = 'file'
  54. def __init__(self , filename = None) :
  55. """
  56. This class read a binary file.
  57. **Arguments**
  58. filename : the filename to read
  59. """
  60. BaseIO.__init__(self)
  61. self.filename = filename
  62. def read_segment(self,
  63. cascade = True,
  64. lazy = False,
  65. sampling_rate = 1.*pq.Hz,
  66. t_start = 0.*pq.s,
  67. unit = pq.V,
  68. nbchannel = 1,
  69. bytesoffset = 0,
  70. dtype = 'f4',
  71. rangemin = -10,
  72. rangemax = 10,
  73. ):
  74. """
  75. Reading signal in a raw binary interleaved compact file.
  76. Arguments:
  77. sampling_rate : sample rate
  78. t_start : time of the first sample sample of each channel
  79. unit: unit of AnalogSignal can be a str or directly a Quantities
  80. nbchannel : number of channel
  81. bytesoffset : nb of bytes offset at the start of file
  82. dtype : dtype of the data
  83. rangemin , rangemax : if the dtype is integer, range can give in volt the min and the max of the range
  84. """
  85. seg = Segment(file_origin = os.path.basename(self.filename))
  86. if not cascade:
  87. return seg
  88. dtype = np.dtype(dtype)
  89. if type(sampling_rate) == float or type(sampling_rate)==int:
  90. # if not quantitities Hz by default
  91. sampling_rate = sampling_rate*pq.Hz
  92. if type(t_start) == float or type(t_start)==int:
  93. # if not quantitities s by default
  94. t_start = t_start*pq.s
  95. unit = pq.Quantity(1, unit)
  96. if lazy:
  97. sig = []
  98. else:
  99. sig = np.memmap(self.filename, dtype = dtype, mode = 'r', offset = bytesoffset)
  100. if sig.size % nbchannel != 0 :
  101. sig = sig[:- sig.size%nbchannel]
  102. sig = sig.reshape((sig.size//nbchannel,nbchannel))
  103. if dtype.kind == 'i' :
  104. sig = sig.astype('f')
  105. sig /= 2**(8*dtype.itemsize)
  106. sig *= ( rangemax-rangemin )
  107. sig += ( rangemax+rangemin )/2.
  108. elif dtype.kind == 'u' :
  109. sig = sig.astype('f')
  110. sig /= 2**(8*dtype.itemsize)
  111. sig *= ( rangemax-rangemin )
  112. sig += rangemin
  113. anaSig = AnalogSignal(sig, units=unit, sampling_rate=sampling_rate,
  114. t_start=t_start, copy=False)
  115. if lazy:
  116. # TODO
  117. anaSig.lazy_shape = None
  118. seg.analogsignals.append(anaSig)
  119. seg.create_many_to_one_relationship()
  120. return seg
  121. def write_segment(self, segment, dtype='f4', rangemin=-10,
  122. rangemax=10, bytesoffset=0):
  123. """
  124. **Arguments**
  125. segment : the segment to write. Only analog signals will be written.
  126. dtype : dtype of the data
  127. rangemin , rangemax : if the dtype is integer, range can give in volt the min and the max of the range
  128. """
  129. if bytesoffset:
  130. raise NotImplementedError('bytesoffset values other than 0 ' +
  131. 'not supported')
  132. dtype = np.dtype(dtype)
  133. # all AnaologSignal from Segment must have the same length
  134. for anasig in segment.analogsignals[1:]:
  135. assert anasig.size == segment.analogsignals[0].size
  136. sigs = np.empty((segment.analogsignals[0].size, len(segment.analogsignals)))
  137. for i, anasig in enumerate(segment.analogsignals):
  138. sigs[:, i] = anasig.magnitude
  139. if dtype.kind == 'i':
  140. sigs -= ( rangemax+rangemin )/2.
  141. sigs /= (rangemax - rangemin)
  142. sigs *= 2 ** (8 * dtype.itemsize )
  143. elif dtype.kind == 'u' :
  144. sigs -= rangemin
  145. sigs /= (rangemax - rangemin)
  146. sigs *= 2 ** (8 * dtype.itemsize)
  147. sigs = sigs.astype(dtype)
  148. f = open(self.filename, 'wb')
  149. f.write(sigs.tostring())
  150. f.close()