123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- # -*- coding: utf-8 -*-
- """
- Class for reading/writing analog signals in a text file.
- Each column represents an AnalogSignal. All AnalogSignals have the same sampling rate.
- Covers many case when part of a file can be viewed as a CSV format.
- Supported : Read/Write
- Author: sgarcia
- """
- import csv
- import os
- import numpy as np
- import quantities as pq
- from neo.io.baseio import BaseIO
- from neo.core import AnalogSignal, Segment
- class AsciiSignalIO(BaseIO):
- """
- Class for reading signal in generic ascii format.
- Columns respresents signals. They all share the same sampling rate.
- The sampling rate is externally known or the first columns could hold the time
- vector.
- Usage:
- >>> from neo import io
- >>> r = io.AsciiSignalIO(filename='File_asciisignal_2.txt')
- >>> seg = r.read_segment()
- >>> print seg.analogsignals
- [<AnalogSignal(array([ 39.0625 , 0. , 0. , ..., -26.85546875 ...
- """
- is_readable = True
- is_writable = True
- supported_objects = [Segment, AnalogSignal]
- readable_objects = [Segment]
- writeable_objects = [Segment]
- has_header = False
- is_streameable = False
- read_params = {
- Segment: [
- ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
- ('usecols', {'value': None, 'type': int}),
- ('skiprows', {'value': 0}),
- ('timecolumn', {'value': None, 'type': int}),
- ('unit', {'value': 'V', }),
- ('sampling_rate', {'value': 1000., }),
- ('t_start', {'value': 0., }),
- ('method', {'value': 'homemade', 'possible': ['genfromtxt', 'csv', 'homemade']}),
- ]
- }
- write_params = {
- Segment: [
- ('delimiter', {'value': '\t', 'possible': ['\t', ' ', ',', ';']}),
- ('writetimecolumn', {'value': True, }),
- ]
- }
- name = None
- extensions = ['txt', 'asc', ]
- mode = 'file'
- def __init__(self, filename=None):
- """
- This class read/write AnalogSignal in a text file.
- Each signal is a column.
- One of the column can be the time vector
- Arguments:
- filename : the filename to read/write
- """
- BaseIO.__init__(self)
- self.filename = filename
- def read_segment(self,
- lazy=False,
- delimiter='\t',
- usecols=None,
- skiprows=0,
- timecolumn=None,
- sampling_rate=1. * pq.Hz,
- t_start=0. * pq.s,
- unit=pq.V,
- method='genfromtxt',
- ):
- """
- Arguments:
- delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
- usecols : if None take all columns otherwise a list for selected columns
- skiprows : skip n first lines in case they contains header informations
- timecolumn : None or a valid int that point the time vector
- samplerate : the samplerate of signals if timecolumn is not None this is not take in account
- t_start : time of the first sample
- unit : unit of AnalogSignal can be a str or directly a Quantities
- method : 'genfromtxt' or 'csv' or 'homemade'
- in case of bugs you can try one of this methods
- 'genfromtxt' use numpy.genfromtxt
- 'csv' use cvs module
- 'homemade' use a intuitive more robust but slow method
- """
- assert not lazy, 'Do not support lazy'
- seg = Segment(file_origin=os.path.basename(self.filename))
- if type(sampling_rate) == float or type(sampling_rate) == int:
- # if not quantitities Hz by default
- sampling_rate = sampling_rate * pq.Hz
- if type(t_start) == float or type(t_start) == int:
- # if not quantitities s by default
- t_start = t_start * pq.s
- unit = pq.Quantity(1, unit)
- # loadtxt
- if method == 'genfromtxt':
- sig = np.genfromtxt(self.filename,
- delimiter=delimiter,
- usecols=usecols,
- skip_header=skiprows,
- dtype='f')
- if len(sig.shape) == 1:
- sig = sig[:, np.newaxis]
- elif method == 'csv':
- tab = [l for l in csv.reader(file(self.filename, 'rU'), delimiter=delimiter)]
- tab = tab[skiprows:]
- sig = np.array(tab, dtype='f')
- elif method == 'homemade':
- fid = open(self.filename, 'rU')
- for l in range(skiprows):
- fid.readline()
- tab = []
- for line in fid.readlines():
- line = line.replace('\r', '')
- line = line.replace('\n', '')
- l = line.split(delimiter)
- while '' in l:
- l.remove('')
- tab.append(l)
- sig = np.array(tab, dtype='f')
- if timecolumn is not None:
- sampling_rate = 1. / np.mean(np.diff(sig[:, timecolumn])) * pq.Hz
- t_start = sig[0, timecolumn] * pq.s
- for i in range(sig.shape[1]):
- if timecolumn == i:
- continue
- if usecols is not None and i not in usecols:
- continue
- signal = sig[:, i] * unit
- anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
- t_start=t_start, channel_index=i,
- name='Column %d' % i)
- seg.analogsignals.append(anaSig)
- seg.create_many_to_one_relationship()
- return seg
- def write_segment(self, segment,
- delimiter='\t',
- skiprows=0,
- writetimecolumn=True,
- ):
- """
- Write a segment and AnalogSignal in a text file.
- **Arguments**
- delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
- writetimecolumn : True or Flase write time vector as first column
- """
- if skiprows:
- raise NotImplementedError('skiprows values other than 0 are not ' +
- 'supported')
- l = []
- if writetimecolumn is not None:
- l.append(segment.analogsignals[0].times[:, np.newaxis])
- for anaSig in segment.analogsignals:
- l.append(anaSig.magnitude[:, np.newaxis])
- sigs = np.concatenate(l, axis=1)
- # print sigs.shape
- np.savetxt(self.filename, sigs, delimiter=delimiter)
|