Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

1
0

asciisignalio.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing analog signals in a text file.
  4. Each column represents an AnalogSignal. All AnalogSignals have the same sampling rate.
  5. Covers many case when part of a file can be viewed as a CSV format.
  6. Supported : Read/Write
  7. Author: sgarcia
  8. """
  9. import csv
  10. import os
  11. import numpy as np
  12. import quantities as pq
  13. from neo.io.baseio import BaseIO
  14. from neo.core import AnalogSignal, Segment
  15. class AsciiSignalIO(BaseIO):
  16. """
  17. Class for reading signal in generic ascii format.
  18. Columns respresents signals. They all share the same sampling rate.
  19. The sampling rate is externally known or the first columns could hold the time
  20. vector.
  21. Usage:
  22. >>> from neo import io
  23. >>> r = io.AsciiSignalIO(filename='File_asciisignal_2.txt')
  24. >>> seg = r.read_segment(lazy=False, cascade=True)
  25. >>> print seg.analogsignals
  26. [<AnalogSignal(array([ 39.0625 , 0. , 0. , ..., -26.85546875 ...
  27. """
  28. is_readable = True
  29. is_writable = True
  30. supported_objects = [ Segment , AnalogSignal]
  31. readable_objects = [ Segment]
  32. writeable_objects = [Segment]
  33. has_header = False
  34. is_streameable = False
  35. read_params = {
  36. Segment : [
  37. ('delimiter' , {'value' : '\t', 'possible' : ['\t' , ' ' , ',' , ';'] }) ,
  38. ('usecols' , { 'value' : None , 'type' : int } ),
  39. ('skiprows' , { 'value' :0 } ),
  40. ('timecolumn' , { 'value' : None, 'type' : int } ) ,
  41. ('unit' , { 'value' : 'V', } ),
  42. ('sampling_rate' , { 'value' : 1000., } ),
  43. ('t_start' , { 'value' : 0., } ),
  44. ('method' , { 'value' : 'homemade', 'possible' : ['genfromtxt' , 'csv' , 'homemade' ] }) ,
  45. ]
  46. }
  47. write_params = {
  48. Segment : [
  49. ('delimiter' , {'value' : '\t', 'possible' : ['\t' , ' ' , ',' , ';'] }) ,
  50. ('writetimecolumn' , { 'value' : True, } ) ,
  51. ]
  52. }
  53. name = None
  54. extensions = [ 'txt' , 'asc', ]
  55. mode = 'file'
  56. def __init__(self , filename = None) :
  57. """
  58. This class read/write AnalogSignal in a text file.
  59. Each signal is a column.
  60. One of the column can be the time vector
  61. Arguments:
  62. filename : the filename to read/write
  63. """
  64. BaseIO.__init__(self)
  65. self.filename = filename
  66. def read_segment(self,
  67. lazy = False,
  68. cascade = True,
  69. delimiter = '\t',
  70. usecols = None,
  71. skiprows =0,
  72. timecolumn = None,
  73. sampling_rate = 1.*pq.Hz,
  74. t_start = 0.*pq.s,
  75. unit = pq.V,
  76. method = 'genfromtxt',
  77. ):
  78. """
  79. Arguments:
  80. delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
  81. usecols : if None take all columns otherwise a list for selected columns
  82. skiprows : skip n first lines in case they contains header informations
  83. timecolumn : None or a valid int that point the time vector
  84. samplerate : the samplerate of signals if timecolumn is not None this is not take in account
  85. t_start : time of the first sample
  86. unit : unit of AnalogSignal can be a str or directly a Quantities
  87. method : 'genfromtxt' or 'csv' or 'homemade'
  88. in case of bugs you can try one of this methods
  89. 'genfromtxt' use numpy.genfromtxt
  90. 'csv' use cvs module
  91. 'homemade' use a intuitive more robust but slow method
  92. """
  93. seg = Segment(file_origin = os.path.basename(self.filename))
  94. if not cascade:
  95. return seg
  96. if type(sampling_rate) == float or type(sampling_rate)==int:
  97. # if not quantitities Hz by default
  98. sampling_rate = sampling_rate*pq.Hz
  99. if type(t_start) == float or type(t_start)==int:
  100. # if not quantitities s by default
  101. t_start = t_start*pq.s
  102. unit = pq.Quantity(1, unit)
  103. #loadtxt
  104. if method == 'genfromtxt' :
  105. sig = np.genfromtxt(self.filename,
  106. delimiter = delimiter,
  107. usecols = usecols ,
  108. skip_header = skiprows,
  109. dtype = 'f')
  110. if len(sig.shape) ==1:
  111. sig = sig[:, np.newaxis]
  112. elif method == 'csv' :
  113. tab = [l for l in csv.reader( file(self.filename,'rU') , delimiter = delimiter ) ]
  114. tab = tab[skiprows:]
  115. sig = np.array( tab , dtype = 'f')
  116. elif method == 'homemade' :
  117. fid = open(self.filename,'rU')
  118. for l in range(skiprows):
  119. fid.readline()
  120. tab = [ ]
  121. for line in fid.readlines():
  122. line = line.replace('\r','')
  123. line = line.replace('\n','')
  124. l = line.split(delimiter)
  125. while '' in l :
  126. l.remove('')
  127. tab.append(l)
  128. sig = np.array( tab , dtype = 'f')
  129. if timecolumn is not None:
  130. sampling_rate = 1./np.mean(np.diff(sig[:,timecolumn])) * pq.Hz
  131. t_start = sig[0,timecolumn] * pq.s
  132. for i in range(sig.shape[1]) :
  133. if timecolumn == i : continue
  134. if usecols is not None and i not in usecols: continue
  135. if lazy:
  136. signal = [ ]*unit
  137. else:
  138. signal = sig[:,i]*unit
  139. anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
  140. t_start=t_start, channel_index=i,
  141. name='Column %d'%i)
  142. if lazy:
  143. anaSig.lazy_shape = sig.shape
  144. seg.analogsignals.append( anaSig )
  145. seg.create_many_to_one_relationship()
  146. return seg
  147. def write_segment(self, segment,
  148. delimiter = '\t',
  149. skiprows =0,
  150. writetimecolumn = True,
  151. ):
  152. """
  153. Write a segment and AnalogSignal in a text file.
  154. **Arguments**
  155. delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
  156. writetimecolumn : True or Flase write time vector as first column
  157. """
  158. if skiprows:
  159. raise NotImplementedError('skiprows values other than 0 are not ' +
  160. 'supported')
  161. l = [ ]
  162. if writetimecolumn is not None:
  163. l.append(segment.analogsignals[0].times[:, np.newaxis])
  164. for anaSig in segment.analogsignals:
  165. l.append(anaSig.magnitude[:, np.newaxis])
  166. sigs = np.concatenate(l, axis=1)
  167. #print sigs.shape
  168. np.savetxt(self.filename , sigs , delimiter = delimiter)