""" Class for reading/writing analog signals in a text file. Each column represents an AnalogSignal. All AnalogSignals have the same sampling rate. Covers many cases when parts of a file can be viewed as a CSV format. Supported : Read/Write Author: sgarcia """ import csv import os import json import numpy as np import quantities as pq from neo.io.baseio import BaseIO from neo.core import AnalogSignal, IrregularlySampledSignal, Segment, Block class AsciiSignalIO(BaseIO): """ Class for reading signals in generic ascii format. Columns represent signals. They all share the same sampling rate. The sampling rate is externally known or the first column could hold the time vector. Usage: >>> from neo import io >>> r = io.AsciiSignalIO(filename='File_asciisignal_2.txt') >>> seg = r.read_segment() >>> print seg.analogsignals [ AnalogSignal sampling_rate = 1.0 / np.mean(np.diff(sig[:, self.timecolumn])) / self.time_units else: # not equally spaced --> IrregularlySampledSignal sampling_rate = None t_start = sig[0, self.timecolumn] * self.time_units if self.signal_group_mode == 'all-in-one': if self.timecolumn is not None: mask = list(range(sig.shape[1])) if self.timecolumn >= 0: mask.remove(self.timecolumn) else: # allow negative column index mask.remove(sig.shape[1] + self.timecolumn) signal = sig[:, mask] else: signal = sig if sampling_rate is None: irr_sig = IrregularlySampledSignal(signal[:, self.timecolumn] * self.time_units, signal * self.units, name='multichannel') seg.irregularlysampledsignals.append(irr_sig) else: ana_sig = AnalogSignal(signal * self.units, sampling_rate=sampling_rate, t_start=t_start, channel_index=self.usecols or np.arange(signal.shape[1]), name='multichannel') seg.analogsignals.append(ana_sig) else: if self.timecolumn is not None and self.timecolumn < 0: time_col = sig.shape[1] + self.timecolumn else: time_col = self.timecolumn for i in range(sig.shape[1]): if time_col == i: continue signal = sig[:, i] * self.units if sampling_rate is None: irr_sig = IrregularlySampledSignal(sig[:, time_col] * self.time_units, signal, t_start=t_start, channel_index=i, name='Column %d' % i) seg.irregularlysampledsignals.append(irr_sig) else: ana_sig = AnalogSignal(signal, sampling_rate=sampling_rate, t_start=t_start, channel_index=i, name='Column %d' % i) seg.analogsignals.append(ana_sig) seg.create_many_to_one_relationship() return seg def read_metadata(self): """ Read IO parameters from an associated JSON file """ # todo: also read annotations if self.metadata_filename is None: candidate = os.path.splitext(self.filename)[0] + "_about.json" if os.path.exists(candidate): self.metadata_filename = candidate else: return {} if os.path.exists(self.metadata_filename): with open(self.metadata_filename) as fp: metadata = json.load(fp) for key in "sampling_rate", "t_start": if key in metadata: metadata[key] = pq.Quantity(metadata[key]["value"], metadata[key]["units"]) for key in "units", "time_units": if key in metadata: metadata[key] = pq.Quantity(1, metadata[key]) return metadata else: return {} def write_segment(self, segment): """ Write a segment and AnalogSignal in a text file. """ # todo: check all analog signals have the same length, physical dimensions # and sampling rates l = [] if self.timecolumn is not None: if self.timecolumn != 0: raise NotImplementedError("Only column 0 currently supported for writing times") l.append(segment.analogsignals[0].times[:, np.newaxis].rescale(self.time_units)) # check signals are compatible (size, sampling rate), otherwise we # can't/shouldn't concatenate them # also set sampling_rate, t_start, units, time_units from signal(s) signal0 = segment.analogsignals[0] for attr in ("sampling_rate", "units", "shape"): val0 = getattr(signal0, attr) for signal in segment.analogsignals[1:]: val1 = getattr(signal, attr) if val0 != val1: raise Exception("Signals being written have different " + attr) setattr(self, attr, val0) # todo t_start, time_units self.time_units = signal0.times.units self.t_start = min(sig.t_start for sig in segment.analogsignals) for anaSig in segment.analogsignals: l.append(anaSig.rescale(self.units).magnitude) sigs = np.concatenate(l, axis=1) # print sigs.shape np.savetxt(self.filename, sigs, delimiter=self.delimiter) if self.metadata_filename is not None: self.write_metadata() def write_block(self, block): """ Can only write blocks containing a single segment. """ # in future, maybe separate segments by a blank link, or a "magic" comment if len(block.segments) > 1: raise ValueError("Can only write blocks containing a single segment." " This block contains {} segments.".format(len(block.segments))) self.write_segment(block.segments[0]) def write_metadata(self, metadata_filename=None): """ Write IO parameters to an associated JSON file """ # todo: also write annotations metadata = { "filename": self.filename, "delimiter": self.delimiter, "usecols": self.usecols, "skiprows": self.skiprows, "timecolumn": self.timecolumn, "sampling_rate": { "value": float(self.sampling_rate.magnitude), "units": self.sampling_rate.dimensionality.string }, "t_start": { "value": float(self.t_start.magnitude), "units": self.t_start.dimensionality.string }, "units": self.units.dimensionality.string, "time_units": self.time_units.dimensionality.string, "method": self.method, "signal_group_mode": self.signal_group_mode } if metadata_filename is None: if self.metadata_filename is None: self.metadata_filename = os.path.splitext(self.filename) + "_about.json" else: self.metadata_filename = metadata_filename with open(self.metadata_filename, "w") as fp: json.dump(metadata, fp) return self.metadata_filename