asciisignalio.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. """
  2. Class for reading/writing analog signals in a text file.
  3. Each column represents an AnalogSignal. All AnalogSignals have the same sampling rate.
  4. Covers many cases when parts of a file can be viewed as a CSV format.
  5. Supported : Read/Write
  6. Author: sgarcia
  7. """
  8. import csv
  9. import os
  10. import json
  11. import numpy as np
  12. import quantities as pq
  13. from neo.io.baseio import BaseIO
  14. from neo.core import AnalogSignal, IrregularlySampledSignal, Segment, Block
  15. class AsciiSignalIO(BaseIO):
  16. """
  17. Class for reading signals in generic ascii format.
  18. Columns represent signals. They all share the same sampling rate.
  19. The sampling rate is externally known or the first column could hold the time vector.
  20. Usage:
  21. >>> from neo import io
  22. >>> r = io.AsciiSignalIO(filename='File_asciisignal_2.txt')
  23. >>> seg = r.read_segment()
  24. >>> print seg.analogsignals
  25. [<AnalogSignal(array([ 39.0625 , 0. , 0. , ..., -26.85546875 ...
  26. Arguments relevant for reading and writing:
  27. delimiter:
  28. column delimiter in file, e.g. '\t', one space, two spaces, ',', ';'
  29. timecolumn:
  30. None or a valid integer that identifies which column contains the time vector
  31. (counting from zero)
  32. units:
  33. units of AnalogSignal can be a str or directly a Quantity
  34. time_units:
  35. where timecolumn is specified, the time units must be specified as a string or
  36. Quantity
  37. metadata_filename:
  38. the path to a JSON file containing metadata
  39. Arguments relevant only for reading:
  40. usecols:
  41. if None take all columns otherwise a list for selected columns (counting from zero)
  42. skiprows:
  43. skip n first lines in case they contains header informations
  44. sampling_rate:
  45. the sampling rate of signals. Ignored if timecolumn is not None
  46. t_start:
  47. time of the first sample (Quantity). Ignored if timecolumn is not None
  48. signal_group_mode:
  49. if 'all-in-one', load data as a single, multi-channel AnalogSignal, if 'split-all'
  50. (default for backwards compatibility) load data as separate, single-channel
  51. AnalogSignals
  52. method:
  53. 'genfromtxt', 'csv', 'homemade' or a user-defined function which takes a filename and
  54. usecolumns as argument and returns a 2D NumPy array.
  55. If specifying both usecols and timecolumn, the latter should identify
  56. the column index _after_ removing the unused columns.
  57. The methods are as follows:
  58. - 'genfromtxt' use numpy.genfromtxt
  59. - 'csv' use csv module
  60. - 'homemade' use an intuitive, more robust but slow method
  61. If `metadata_filename` is provided, the parameters for reading/writing the file
  62. ("delimiter", "timecolumn", "units", etc.) will be read from that file.
  63. IF a metadata filename is not provided, the IO will look for a JSON file in the same
  64. directory with a matching filename, e.g. if the datafile was named "foo.txt" then the
  65. IO would automatically look for a file called "foo_about.json"
  66. If parameters are specified both in the metadata file and as arguments to the IO constructor,
  67. the former will take precedence.
  68. Example metadata file::
  69. {
  70. "filename": "foo.txt",
  71. "delimiter": " ",
  72. "timecolumn": 0,
  73. "units": "pA",
  74. "time_units": "ms",
  75. "sampling_rate": {
  76. "value": 1.0,
  77. "units": "kHz"
  78. },
  79. "method": "genfromtxt",
  80. "signal_group_mode": 'all-in-one'
  81. }
  82. """
  83. is_readable = True
  84. is_writable = True
  85. supported_objects = [Block, Segment, AnalogSignal]
  86. readable_objects = [Block, Segment]
  87. # can write a Block with a single segment, but not the general case
  88. writeable_objects = [Segment]
  89. has_header = False
  90. is_streameable = False
  91. read_params = {
  92. Segment: [
  93. ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
  94. ('usecols', {'value': None, 'type': int}),
  95. ('skiprows', {'value': 0}),
  96. ('timecolumn', {'value': None, 'type': int}),
  97. ('units', {'value': 'V', }),
  98. ('time_units', {'value': pq.s, }),
  99. ('sampling_rate', {'value': 1.0 * pq.Hz, }),
  100. ('t_start', {'value': 0.0 * pq.s, }),
  101. ('method', {'value': 'homemade', 'possible': ['genfromtxt', 'csv', 'homemade']}),
  102. ('signal_group_mode', {'value': 'split-all'})
  103. ]
  104. }
  105. write_params = {
  106. Segment: [
  107. ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
  108. ('writetimecolumn', {'value': True, }),
  109. ]
  110. }
  111. name = None
  112. extensions = ['txt', 'asc', 'csv', 'tsv']
  113. mode = 'file'
  114. def __init__(self, filename=None, delimiter='\t', usecols=None, skiprows=0, timecolumn=None,
  115. sampling_rate=1.0 * pq.Hz, t_start=0.0 * pq.s, units=pq.V, time_units=pq.s,
  116. method='genfromtxt', signal_group_mode='split-all', metadata_filename=None):
  117. """
  118. This class read/write AnalogSignal in a text file.
  119. Each signal is a column.
  120. One of the columns can be the time vector.
  121. Arguments:
  122. filename : the filename to read/write
  123. """
  124. # todo: allow units to be a list/array (e.g. current and voltage in the same file)
  125. BaseIO.__init__(self)
  126. self.filename = filename
  127. self.metadata_filename = metadata_filename
  128. metadata = self.read_metadata()
  129. self.delimiter = metadata.get("delimiter", delimiter)
  130. self.usecols = metadata.get("usecols", usecols)
  131. self.skiprows = metadata.get("skiprows", skiprows)
  132. self.timecolumn = metadata.get("timecolumn", timecolumn)
  133. self.sampling_rate = metadata.get("sampling_rate", sampling_rate)
  134. self.time_units = metadata.get("time_units", time_units)
  135. if self.time_units is not None:
  136. self.time_units = pq.Quantity(1, self.time_units)
  137. self.t_start = metadata.get("t_start", t_start)
  138. if not isinstance(t_start, pq.Quantity):
  139. if not isinstance(self.time_units, pq.Quantity):
  140. raise ValueError("Units of t_start not specified")
  141. self.t_start *= self.time_units
  142. self.units = metadata.get("units", pq.Quantity(1, units))
  143. self.method = metadata.get("method", method)
  144. if not(self.method in ('genfromtxt', 'csv', 'homemade') or callable(self.method)):
  145. raise ValueError(
  146. "method must be one of 'genfromtxt', 'csv', 'homemade', or a function")
  147. self.signal_group_mode = metadata.get("signal_group_mode", signal_group_mode)
  148. def read_block(self, lazy=False):
  149. block = Block(file_origin=os.path.basename(self.filename))
  150. segment = self.read_segment(lazy=lazy)
  151. segment.block = block
  152. block.segments.append(segment)
  153. return block
  154. def read_segment(self, lazy=False):
  155. """
  156. """
  157. if lazy:
  158. raise NotImplementedError("lazy mode not supported")
  159. seg = Segment(file_origin=os.path.basename(self.filename))
  160. # loadtxt
  161. if self.method == 'genfromtxt':
  162. sig = np.genfromtxt(self.filename,
  163. delimiter=self.delimiter,
  164. usecols=self.usecols,
  165. skip_header=self.skiprows,
  166. dtype='f')
  167. if len(sig.shape) == 1:
  168. sig = sig[:, np.newaxis]
  169. elif self.method == 'csv':
  170. with open(self.filename, 'rU') as fp:
  171. tab = [l for l in csv.reader(fp, delimiter=self.delimiter)]
  172. tab = tab[self.skiprows:]
  173. sig = np.array(tab, dtype='f')
  174. if self.usecols is not None:
  175. mask = np.array(self.usecols)
  176. sig = sig[:, mask]
  177. elif self.method == 'homemade':
  178. fid = open(self.filename, 'rU')
  179. for l in range(self.skiprows):
  180. fid.readline()
  181. tab = []
  182. for line in fid.readlines():
  183. line = line.replace('\r', '')
  184. line = line.replace('\n', '')
  185. parts = line.split(self.delimiter)
  186. while '' in parts:
  187. parts.remove('')
  188. tab.append(parts)
  189. sig = np.array(tab, dtype='f')
  190. if self.usecols is not None:
  191. mask = np.array(self.usecols)
  192. sig = sig[:, mask]
  193. else:
  194. sig = self.method(self.filename, self.usecols)
  195. if not isinstance(sig, np.ndarray):
  196. raise TypeError("method function must return a NumPy array")
  197. if len(sig.shape) == 1:
  198. sig = sig[:, np.newaxis]
  199. elif len(sig.shape) != 2:
  200. raise ValueError("method function must return a 1D or 2D NumPy array")
  201. if self.timecolumn is None:
  202. sampling_rate = self.sampling_rate
  203. t_start = self.t_start
  204. else:
  205. delta_t = np.diff(sig[:, self.timecolumn])
  206. mean_delta_t = np.mean(delta_t)
  207. if (delta_t.max() - delta_t.min()) / mean_delta_t < 1e-6:
  208. # equally spaced --> AnalogSignal
  209. sampling_rate = 1.0 / np.mean(np.diff(sig[:, self.timecolumn])) / self.time_units
  210. else:
  211. # not equally spaced --> IrregularlySampledSignal
  212. sampling_rate = None
  213. t_start = sig[0, self.timecolumn] * self.time_units
  214. if self.signal_group_mode == 'all-in-one':
  215. if self.timecolumn is not None:
  216. mask = list(range(sig.shape[1]))
  217. if self.timecolumn >= 0:
  218. mask.remove(self.timecolumn)
  219. else: # allow negative column index
  220. mask.remove(sig.shape[1] + self.timecolumn)
  221. signal = sig[:, mask]
  222. else:
  223. signal = sig
  224. if sampling_rate is None:
  225. irr_sig = IrregularlySampledSignal(signal[:, self.timecolumn] * self.time_units,
  226. signal * self.units,
  227. name='multichannel')
  228. seg.irregularlysampledsignals.append(irr_sig)
  229. else:
  230. ana_sig = AnalogSignal(signal * self.units, sampling_rate=sampling_rate,
  231. t_start=t_start,
  232. channel_index=self.usecols or np.arange(signal.shape[1]),
  233. name='multichannel')
  234. seg.analogsignals.append(ana_sig)
  235. else:
  236. if self.timecolumn is not None and self.timecolumn < 0:
  237. time_col = sig.shape[1] + self.timecolumn
  238. else:
  239. time_col = self.timecolumn
  240. for i in range(sig.shape[1]):
  241. if time_col == i:
  242. continue
  243. signal = sig[:, i] * self.units
  244. if sampling_rate is None:
  245. irr_sig = IrregularlySampledSignal(sig[:, time_col] * self.time_units,
  246. signal,
  247. t_start=t_start, channel_index=i,
  248. name='Column %d' % i)
  249. seg.irregularlysampledsignals.append(irr_sig)
  250. else:
  251. ana_sig = AnalogSignal(signal, sampling_rate=sampling_rate,
  252. t_start=t_start, channel_index=i,
  253. name='Column %d' % i)
  254. seg.analogsignals.append(ana_sig)
  255. seg.create_many_to_one_relationship()
  256. return seg
  257. def read_metadata(self):
  258. """
  259. Read IO parameters from an associated JSON file
  260. """
  261. # todo: also read annotations
  262. if self.metadata_filename is None:
  263. candidate = os.path.splitext(self.filename)[0] + "_about.json"
  264. if os.path.exists(candidate):
  265. self.metadata_filename = candidate
  266. else:
  267. return {}
  268. if os.path.exists(self.metadata_filename):
  269. with open(self.metadata_filename) as fp:
  270. metadata = json.load(fp)
  271. for key in "sampling_rate", "t_start":
  272. if key in metadata:
  273. metadata[key] = pq.Quantity(metadata[key]["value"], metadata[key]["units"])
  274. for key in "units", "time_units":
  275. if key in metadata:
  276. metadata[key] = pq.Quantity(1, metadata[key])
  277. return metadata
  278. else:
  279. return {}
  280. def write_segment(self, segment):
  281. """
  282. Write a segment and AnalogSignal in a text file.
  283. """
  284. # todo: check all analog signals have the same length, physical dimensions
  285. # and sampling rates
  286. l = []
  287. if self.timecolumn is not None:
  288. if self.timecolumn != 0:
  289. raise NotImplementedError("Only column 0 currently supported for writing times")
  290. l.append(segment.analogsignals[0].times[:, np.newaxis].rescale(self.time_units))
  291. # check signals are compatible (size, sampling rate), otherwise we
  292. # can't/shouldn't concatenate them
  293. # also set sampling_rate, t_start, units, time_units from signal(s)
  294. signal0 = segment.analogsignals[0]
  295. for attr in ("sampling_rate", "units", "shape"):
  296. val0 = getattr(signal0, attr)
  297. for signal in segment.analogsignals[1:]:
  298. val1 = getattr(signal, attr)
  299. if val0 != val1:
  300. raise Exception("Signals being written have different " + attr)
  301. setattr(self, attr, val0)
  302. # todo t_start, time_units
  303. self.time_units = signal0.times.units
  304. self.t_start = min(sig.t_start for sig in segment.analogsignals)
  305. for anaSig in segment.analogsignals:
  306. l.append(anaSig.rescale(self.units).magnitude)
  307. sigs = np.concatenate(l, axis=1)
  308. # print sigs.shape
  309. np.savetxt(self.filename, sigs, delimiter=self.delimiter)
  310. if self.metadata_filename is not None:
  311. self.write_metadata()
  312. def write_block(self, block):
  313. """
  314. Can only write blocks containing a single segment.
  315. """
  316. # in future, maybe separate segments by a blank link, or a "magic" comment
  317. if len(block.segments) > 1:
  318. raise ValueError("Can only write blocks containing a single segment."
  319. " This block contains {} segments.".format(len(block.segments)))
  320. self.write_segment(block.segments[0])
  321. def write_metadata(self, metadata_filename=None):
  322. """
  323. Write IO parameters to an associated JSON file
  324. """
  325. # todo: also write annotations
  326. metadata = {
  327. "filename": self.filename,
  328. "delimiter": self.delimiter,
  329. "usecols": self.usecols,
  330. "skiprows": self.skiprows,
  331. "timecolumn": self.timecolumn,
  332. "sampling_rate": {
  333. "value": float(self.sampling_rate.magnitude),
  334. "units": self.sampling_rate.dimensionality.string
  335. },
  336. "t_start": {
  337. "value": float(self.t_start.magnitude),
  338. "units": self.t_start.dimensionality.string
  339. },
  340. "units": self.units.dimensionality.string,
  341. "time_units": self.time_units.dimensionality.string,
  342. "method": self.method,
  343. "signal_group_mode": self.signal_group_mode
  344. }
  345. if metadata_filename is None:
  346. if self.metadata_filename is None:
  347. self.metadata_filename = os.path.splitext(self.filename) + "_about.json"
  348. else:
  349. self.metadata_filename = metadata_filename
  350. with open(self.metadata_filename, "w") as fp:
  351. json.dump(metadata, fp)
  352. return self.metadata_filename