Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

asciisignalio.py 6.6 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing analog signals in a text file.
  4. Each column represents an AnalogSignal. All AnalogSignals have the same sampling rate.
  5. Covers many case when part of a file can be viewed as a CSV format.
  6. Supported : Read/Write
  7. Author: sgarcia
  8. """
  9. import csv
  10. import os
  11. import numpy as np
  12. import quantities as pq
  13. from neo.io.baseio import BaseIO
  14. from neo.core import AnalogSignal, Segment
  15. class AsciiSignalIO(BaseIO):
  16. """
  17. Class for reading signal in generic ascii format.
  18. Columns respresents signals. They all share the same sampling rate.
  19. The sampling rate is externally known or the first columns could hold the time
  20. vector.
  21. Usage:
  22. >>> from neo import io
  23. >>> r = io.AsciiSignalIO(filename='File_asciisignal_2.txt')
  24. >>> seg = r.read_segment()
  25. >>> print seg.analogsignals
  26. [<AnalogSignal(array([ 39.0625 , 0. , 0. , ..., -26.85546875 ...
  27. """
  28. is_readable = True
  29. is_writable = True
  30. supported_objects = [Segment, AnalogSignal]
  31. readable_objects = [Segment]
  32. writeable_objects = [Segment]
  33. has_header = False
  34. is_streameable = False
  35. read_params = {
  36. Segment: [
  37. ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
  38. ('usecols', {'value': None, 'type': int}),
  39. ('skiprows', {'value': 0}),
  40. ('timecolumn', {'value': None, 'type': int}),
  41. ('unit', {'value': 'V', }),
  42. ('sampling_rate', {'value': 1000., }),
  43. ('t_start', {'value': 0., }),
  44. ('method', {'value': 'homemade', 'possible': ['genfromtxt', 'csv', 'homemade']}),
  45. ]
  46. }
  47. write_params = {
  48. Segment: [
  49. ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
  50. ('writetimecolumn', {'value': True, }),
  51. ]
  52. }
  53. name = None
  54. extensions = ['txt', 'asc', ]
  55. mode = 'file'
  56. def __init__(self, filename=None):
  57. """
  58. This class read/write AnalogSignal in a text file.
  59. Each signal is a column.
  60. One of the column can be the time vector
  61. Arguments:
  62. filename : the filename to read/write
  63. """
  64. BaseIO.__init__(self)
  65. self.filename = filename
  66. def read_segment(self,
  67. lazy=False,
  68. delimiter='\t',
  69. usecols=None,
  70. skiprows=0,
  71. timecolumn=None,
  72. sampling_rate=1. * pq.Hz,
  73. t_start=0. * pq.s,
  74. unit=pq.V,
  75. method='genfromtxt',
  76. ):
  77. """
  78. Arguments:
  79. delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
  80. usecols : if None take all columns otherwise a list for selected columns
  81. skiprows : skip n first lines in case they contains header informations
  82. timecolumn : None or a valid int that point the time vector
  83. samplerate : the samplerate of signals if timecolumn is not None this is not take in account
  84. t_start : time of the first sample
  85. unit : unit of AnalogSignal can be a str or directly a Quantities
  86. method : 'genfromtxt' or 'csv' or 'homemade'
  87. in case of bugs you can try one of this methods
  88. 'genfromtxt' use numpy.genfromtxt
  89. 'csv' use cvs module
  90. 'homemade' use a intuitive more robust but slow method
  91. """
  92. assert not lazy, 'Do not support lazy'
  93. seg = Segment(file_origin=os.path.basename(self.filename))
  94. if type(sampling_rate) == float or type(sampling_rate) == int:
  95. # if not quantitities Hz by default
  96. sampling_rate = sampling_rate * pq.Hz
  97. if type(t_start) == float or type(t_start) == int:
  98. # if not quantitities s by default
  99. t_start = t_start * pq.s
  100. unit = pq.Quantity(1, unit)
  101. # loadtxt
  102. if method == 'genfromtxt':
  103. sig = np.genfromtxt(self.filename,
  104. delimiter=delimiter,
  105. usecols=usecols,
  106. skip_header=skiprows,
  107. dtype='f')
  108. if len(sig.shape) == 1:
  109. sig = sig[:, np.newaxis]
  110. elif method == 'csv':
  111. tab = [l for l in csv.reader(file(self.filename, 'rU'), delimiter=delimiter)]
  112. tab = tab[skiprows:]
  113. sig = np.array(tab, dtype='f')
  114. elif method == 'homemade':
  115. fid = open(self.filename, 'rU')
  116. for l in range(skiprows):
  117. fid.readline()
  118. tab = []
  119. for line in fid.readlines():
  120. line = line.replace('\r', '')
  121. line = line.replace('\n', '')
  122. l = line.split(delimiter)
  123. while '' in l:
  124. l.remove('')
  125. tab.append(l)
  126. sig = np.array(tab, dtype='f')
  127. if timecolumn is not None:
  128. sampling_rate = 1. / np.mean(np.diff(sig[:, timecolumn])) * pq.Hz
  129. t_start = sig[0, timecolumn] * pq.s
  130. for i in range(sig.shape[1]):
  131. if timecolumn == i:
  132. continue
  133. if usecols is not None and i not in usecols:
  134. continue
  135. signal = sig[:, i] * unit
  136. anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
  137. t_start=t_start, channel_index=i,
  138. name='Column %d' % i)
  139. seg.analogsignals.append(anaSig)
  140. seg.create_many_to_one_relationship()
  141. return seg
  142. def write_segment(self, segment,
  143. delimiter='\t',
  144. skiprows=0,
  145. writetimecolumn=True,
  146. ):
  147. """
  148. Write a segment and AnalogSignal in a text file.
  149. **Arguments**
  150. delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
  151. writetimecolumn : True or Flase write time vector as first column
  152. """
  153. if skiprows:
  154. raise NotImplementedError('skiprows values other than 0 are not ' +
  155. 'supported')
  156. l = []
  157. if writetimecolumn is not None:
  158. l.append(segment.analogsignals[0].times[:, np.newaxis])
  159. for anaSig in segment.analogsignals:
  160. l.append(anaSig.magnitude[:, np.newaxis])
  161. sigs = np.concatenate(l, axis=1)
  162. # print sigs.shape
  163. np.savetxt(self.filename, sigs, delimiter=delimiter)