123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- # -*- coding: utf-8 -*-
- '''
- This module implements :class:`AnalogSignal`, an array of analog signals.
- :class:`AnalogSignal` inherits from :class:`quantites.Quantity`, which
- inherits from :class:`numpy.array`.
- Inheritance from :class:`numpy.array` is explained here:
- http://docs.scipy.org/doc/numpy/user/basics.subclassing.html
- In brief:
- * Initialization of a new object from constructor happens in :meth:`__new__`.
- This is where user-specified attributes are set.
- * :meth:`__array_finalize__` is called for all new objects, including those
- created by slicing. This is where attributes are copied over from
- the old object.
- '''
- # needed for python 3 compatibility
- from __future__ import absolute_import, division, print_function
- import logging
- import numpy as np
- import quantities as pq
- from neo.core.baseneo import BaseNeo, MergeError, merge_annotations
- from neo.core.channelindex import ChannelIndex
- logger = logging.getLogger("Neo")
- def _get_sampling_rate(sampling_rate, sampling_period):
- '''
- Gets the sampling_rate from either the sampling_period or the
- sampling_rate, or makes sure they match if both are specified
- '''
- if sampling_period is None:
- if sampling_rate is None:
- raise ValueError("You must provide either the sampling rate or " +
- "sampling period")
- elif sampling_rate is None:
- sampling_rate = 1.0 / sampling_period
- elif sampling_period != 1.0 / sampling_rate:
- raise ValueError('The sampling_rate has to be 1/sampling_period')
- if not hasattr(sampling_rate, 'units'):
- raise TypeError("Sampling rate/sampling period must have units")
- return sampling_rate
- def _new_AnalogSignalArray(cls, signal, units=None, dtype=None, copy=True,
- t_start=0*pq.s, sampling_rate=None,
- sampling_period=None, name=None, file_origin=None,
- description=None, annotations=None,
- channel_index=None, segment=None):
- '''
- A function to map AnalogSignal.__new__ to function that
- does not do the unit checking. This is needed for pickle to work.
- '''
- obj = cls(signal=signal, units=units, dtype=dtype, copy=copy,
- t_start=t_start, sampling_rate=sampling_rate,
- sampling_period=sampling_period, name=name,
- file_origin=file_origin, description=description,
- **annotations)
- obj.channel_index = channel_index
- obj.segment = segment
- return obj
- class AnalogSignal(BaseNeo, pq.Quantity):
- '''
- Array of one or more continuous analog signals.
- A representation of several continuous, analog signals that
- have the same duration, sampling rate and start time.
- Basically, it is a 2D array: dim 0 is time, dim 1 is
- channel index
- Inherits from :class:`quantities.Quantity`, which in turn inherits from
- :class:`numpy.ndarray`.
- *Usage*::
- >>> from neo.core import AnalogSignal
- >>> import quantities as pq
- >>>
- >>> sigarr = AnalogSignal([[1, 2, 3], [4, 5, 6]], units='V',
- ... sampling_rate=1*pq.Hz)
- >>>
- >>> sigarr
- <AnalogSignal(array([[1, 2, 3],
- [4, 5, 6]]) * mV, [0.0 s, 2.0 s], sampling rate: 1.0 Hz)>
- >>> sigarr[:,1]
- <AnalogSignal(array([2, 5]) * V, [0.0 s, 2.0 s],
- sampling rate: 1.0 Hz)>
- >>> sigarr[1, 1]
- array(5) * V
- *Required attributes/properties*:
- :signal: (quantity array 2D, numpy array 2D, or list (data, channel))
- The data itself.
- :units: (quantity units) Required if the signal is a list or NumPy
- array, not if it is a :class:`Quantity`
- :t_start: (quantity scalar) Time when signal begins
- :sampling_rate: *or* **sampling_period** (quantity scalar) Number of
- samples per unit time or
- interval between two samples.
- If both are specified, they are
- checked for consistency.
- *Recommended attributes/properties*:
- :name: (str) A label for the dataset.
- :description: (str) Text description.
- :file_origin: (str) Filesystem path or URL of the original data file.
- *Optional attributes/properties*:
- :dtype: (numpy dtype or str) Override the dtype of the signal array.
- :copy: (bool) True by default.
- Note: Any other additional arguments are assumed to be user-specific
- metadata and stored in :attr:`annotations`.
- *Properties available on this object*:
- :sampling_rate: (quantity scalar) Number of samples per unit time.
- (1/:attr:`sampling_period`)
- :sampling_period: (quantity scalar) Interval between two samples.
- (1/:attr:`quantity scalar`)
- :duration: (Quantity) Signal duration, read-only.
- (size * :attr:`sampling_period`)
- :t_stop: (quantity scalar) Time when signal ends, read-only.
- (:attr:`t_start` + :attr:`duration`)
- :times: (quantity 1D) The time points of each sample of the signal,
- read-only.
- (:attr:`t_start` + arange(:attr:`shape`[0])/:attr:`sampling_rate`)
- :channel_index:
- access to the channel_index attribute of the principal ChannelIndex
- associated with this signal.
- *Slicing*:
- :class:`AnalogSignal` objects can be sliced. When taking a single
- column (dimension 0, e.g. [0, :]) or a single element,
- a :class:`~quantities.Quantity` is returned.
- Otherwise an :class:`AnalogSignal` (actually a view) is
- returned, with the same metadata, except that :attr:`t_start`
- is changed if the start index along dimension 1 is greater than 1.
- *Operations available on this object*:
- == != + * /
- '''
- _single_parent_objects = ('Segment', 'ChannelIndex')
- _quantity_attr = 'signal'
- _necessary_attrs = (('signal', pq.Quantity, 2),
- ('sampling_rate', pq.Quantity, 0),
- ('t_start', pq.Quantity, 0))
- _recommended_attrs = BaseNeo._recommended_attrs
- def __new__(cls, signal, units=None, dtype=None, copy=True,
- t_start=0 * pq.s, sampling_rate=None, sampling_period=None,
- name=None, file_origin=None, description=None,
- **annotations):
- '''
- Constructs new :class:`AnalogSignal` from data.
- This is called whenever a new class:`AnalogSignal` is created from
- the constructor, but not when slicing.
- __array_finalize__ is called on the new object.
- '''
- if units is None:
- if not hasattr(signal, "units"):
- raise ValueError("Units must be specified")
- elif isinstance(signal, pq.Quantity):
- # could improve this test, what if units is a string?
- if units != signal.units:
- signal = signal.rescale(units)
- obj = pq.Quantity(signal, units=units, dtype=dtype, copy=copy).view(cls)
- if obj.ndim == 1:
- obj.shape = (-1, 1)
- if t_start is None:
- raise ValueError('t_start cannot be None')
- obj._t_start = t_start
- obj._sampling_rate = _get_sampling_rate(sampling_rate, sampling_period)
- obj.segment = None
- obj.channel_index = None
- return obj
- def __init__(self, signal, units=None, dtype=None, copy=True,
- t_start=0 * pq.s, sampling_rate=None, sampling_period=None,
- name=None, file_origin=None, description=None,
- **annotations):
- '''
- Initializes a newly constructed :class:`AnalogSignal` instance.
- '''
- # This method is only called when constructing a new AnalogSignal,
- # not when slicing or viewing. We use the same call signature
- # as __new__ for documentation purposes. Anything not in the call
- # signature is stored in annotations.
- # Calls parent __init__, which grabs universally recommended
- # attributes and sets up self.annotations
- BaseNeo.__init__(self, name=name, file_origin=file_origin,
- description=description, **annotations)
- def __reduce__(self):
- '''
- Map the __new__ function onto _new_AnalogSignalArray, so that pickle
- works
- '''
- return _new_AnalogSignalArray, (self.__class__,
- np.array(self),
- self.units,
- self.dtype,
- True,
- self.t_start,
- self.sampling_rate,
- self.sampling_period,
- self.name,
- self.file_origin,
- self.description,
- self.annotations,
- self.channel_index,
- self.segment)
- def __array_finalize__(self, obj):
- '''
- This is called every time a new :class:`AnalogSignal` is created.
- It is the appropriate place to set default values for attributes
- for :class:`AnalogSignal` constructed by slicing or viewing.
- User-specified values are only relevant for construction from
- constructor, and these are set in __new__. Then they are just
- copied over here.
- '''
- super(AnalogSignal, self).__array_finalize__(obj)
- self._t_start = getattr(obj, '_t_start', 0 * pq.s)
- self._sampling_rate = getattr(obj, '_sampling_rate', None)
-
- # The additional arguments
- self.annotations = getattr(obj, 'annotations', {})
- # Globally recommended attributes
- self.name = getattr(obj, 'name', None)
- self.file_origin = getattr(obj, 'file_origin', None)
- self.description = getattr(obj, 'description', None)
- # Parents objects
- self.segment = getattr(obj, 'segment', None)
- self.channel_index = getattr(obj, 'channel_index', None)
- def __repr__(self):
- '''
- Returns a string representing the :class:`AnalogSignal`.
- '''
- return ('<%s(%s, [%s, %s], sampling rate: %s)>' %
- (self.__class__.__name__,
- super(AnalogSignal, self).__repr__(), self.t_start,
- self.t_stop, self.sampling_rate))
- def get_channel_index(self):
- """
- """
- if self.channel_index:
- return self.channel_index.index
- else:
- return None
- def __getslice__(self, i, j):
- '''
- Get a slice from :attr:`i` to :attr:`j`.
- Doesn't get called in Python 3, :meth:`__getitem__` is called instead
- '''
- return self.__getitem__(slice(i, j))
- def __getitem__(self, i):
- '''
- Get the item or slice :attr:`i`.
- '''
- obj = super(AnalogSignal, self).__getitem__(i)
- if isinstance(i, int): # a single point in time across all channels
- obj = pq.Quantity(obj.magnitude, units=obj.units)
- elif isinstance(i, tuple):
- j, k = i
- if isinstance(j, int): # extract a quantity array
- obj = pq.Quantity(obj.magnitude, units=obj.units)
- else:
- if isinstance(j, slice):
- if j.start:
- obj.t_start = (self.t_start +
- j.start * self.sampling_period)
- if j.step:
- obj.sampling_period *= j.step
- elif isinstance(j, np.ndarray):
- raise NotImplementedError("Arrays not yet supported")
- # in the general case, would need to return IrregularlySampledSignal(Array)
- else:
- raise TypeError("%s not supported" % type(j))
- if isinstance(k, int):
- obj = obj.reshape(-1, 1)
- if self.channel_index:
- obj.channel_index = self.channel_index.__getitem__(k)
- elif isinstance(i, slice):
- if i.start:
- obj.t_start = self.t_start + i.start * self.sampling_period
- else:
- raise IndexError("index should be an integer, tuple or slice")
- return obj
- def __setitem__(self, i, value):
- """
- Set an item or slice defined by :attr:`i` to `value`.
- """
- # because AnalogSignals are always at least two-dimensional,
- # we need to handle the case where `i` is an integer
- if isinstance(i, int):
- i = slice(i, i + 1)
- elif isinstance(i, tuple):
- j, k = i
- if isinstance(k, int):
- i = (j, slice(k, k + 1))
- return super(AnalogSignal, self).__setitem__(i, value)
- # sampling_rate attribute is handled as a property so type checking can
- # be done
- @property
- def sampling_rate(self):
- '''
- Number of samples per unit time.
- (1/:attr:`sampling_period`)
- '''
- return self._sampling_rate
- @sampling_rate.setter
- def sampling_rate(self, rate):
- '''
- Setter for :attr:`sampling_rate`
- '''
- if rate is None:
- raise ValueError('sampling_rate cannot be None')
- elif not hasattr(rate, 'units'):
- raise ValueError('sampling_rate must have units')
- self._sampling_rate = rate
- # sampling_period attribute is handled as a property on underlying rate
- @property
- def sampling_period(self):
- '''
- Interval between two samples.
- (1/:attr:`sampling_rate`)
- '''
- return 1. / self.sampling_rate
- @sampling_period.setter
- def sampling_period(self, period):
- '''
- Setter for :attr:`sampling_period`
- '''
- if period is None:
- raise ValueError('sampling_period cannot be None')
- elif not hasattr(period, 'units'):
- raise ValueError('sampling_period must have units')
- self.sampling_rate = 1. / period
- # t_start attribute is handled as a property so type checking can be done
- @property
- def t_start(self):
- '''
- Time when signal begins.
- '''
- return self._t_start
- @t_start.setter
- def t_start(self, start):
- '''
- Setter for :attr:`t_start`
- '''
- if start is None:
- raise ValueError('t_start cannot be None')
- self._t_start = start
- @property
- def duration(self):
- '''
- Signal duration
- (:attr:`size` * :attr:`sampling_period`)
- '''
- return self.shape[0] / self.sampling_rate
- @property
- def t_stop(self):
- '''
- Time when signal ends.
- (:attr:`t_start` + :attr:`duration`)
- '''
- return self.t_start + self.duration
- @property
- def times(self):
- '''
- The time points of each sample of the signal
- (:attr:`t_start` + arange(:attr:`shape`)/:attr:`sampling_rate`)
- '''
- return self.t_start + np.arange(self.shape[0]) / self.sampling_rate
- def rescale(self, units):
- '''
- Return a copy of the AnalogSignal converted to the specified
- units
- '''
- to_dims = pq.quantity.validate_dimensionality(units)
- if self.dimensionality == to_dims:
- to_u = self.units
- signal = np.array(self)
- else:
- to_u = pq.Quantity(1.0, to_dims)
- from_u = pq.Quantity(1.0, self.dimensionality)
- try:
- cf = pq.quantity.get_conversion_factor(from_u, to_u)
- except AssertionError:
- raise ValueError('Unable to convert between units of "%s" \
- and "%s"' % (from_u._dimensionality,
- to_u._dimensionality))
- signal = cf * self.magnitude
- new = self.__class__(signal=signal, units=to_u,
- sampling_rate=self.sampling_rate)
- new._copy_data_complement(self)
- new.annotations.update(self.annotations)
- return new
- def duplicate_with_new_array(self, signal):
- '''
- Create a new :class:`AnalogSignal` with the same metadata
- but different data
- '''
- #signal is the new signal
- new = self.__class__(signal=signal, units=self.units,
- sampling_rate=self.sampling_rate)
- new._copy_data_complement(self)
- new.annotations.update(self.annotations)
- return new
- def __eq__(self, other):
- '''
- Equality test (==)
- '''
- if (self.t_start != other.t_start or
- self.sampling_rate != other.sampling_rate):
- return False
- return super(AnalogSignal, self).__eq__(other)
- def __ne__(self, other):
- '''
- Non-equality test (!=)
- '''
- return not self.__eq__(other)
- def _check_consistency(self, other):
- '''
- Check if the attributes of another :class:`AnalogSignal`
- are compatible with this one.
- '''
- if isinstance(other, AnalogSignal):
- for attr in "t_start", "sampling_rate":
- if getattr(self, attr) != getattr(other, attr):
- raise ValueError("Inconsistent values of %s" % attr)
- # how to handle name and annotations?
- def _copy_data_complement(self, other):
- '''
- Copy the metadata from another :class:`AnalogSignal`.
- '''
- for attr in ("t_start", "sampling_rate", "name", "file_origin",
- "description", "annotations"):
- setattr(self, attr, getattr(other, attr, None))
- def _apply_operator(self, other, op, *args):
- '''
- Handle copying metadata to the new :class:`AnalogSignal`
- after a mathematical operation.
- '''
- self._check_consistency(other)
- f = getattr(super(AnalogSignal, self), op)
- new_signal = f(other, *args)
- new_signal._copy_data_complement(self)
- return new_signal
- def __add__(self, other, *args):
- '''
- Addition (+)
- '''
- return self._apply_operator(other, "__add__", *args)
- def __sub__(self, other, *args):
- '''
- Subtraction (-)
- '''
- return self._apply_operator(other, "__sub__", *args)
- def __mul__(self, other, *args):
- '''
- Multiplication (*)
- '''
- return self._apply_operator(other, "__mul__", *args)
- def __truediv__(self, other, *args):
- '''
- Float division (/)
- '''
- return self._apply_operator(other, "__truediv__", *args)
- def __div__(self, other, *args):
- '''
- Integer division (//)
- '''
- return self._apply_operator(other, "__div__", *args)
- __radd__ = __add__
- __rmul__ = __sub__
- def __rsub__(self, other, *args):
- '''
- Backwards subtraction (other-self)
- '''
- return self.__mul__(-1, *args) + other
- def _repr_pretty_(self, pp, cycle):
- '''
- Handle pretty-printing the :class:`AnalogSignal`.
- '''
- pp.text("{cls} with {channels} channels of length {length}; "
- "units {units}; datatype {dtype} ".format(
- cls=self.__class__.__name__,
- channels=self.shape[1],
- length=self.shape[0],
- units=self.units.dimensionality.string,
- dtype=self.dtype))
- if self._has_repr_pretty_attrs_():
- pp.breakable()
- self._repr_pretty_attrs_(pp, cycle)
- def _pp(line):
- pp.breakable()
- with pp.group(indent=1):
- pp.text(line)
- for line in ["sampling rate: {0}".format(self.sampling_rate),
- "time: {0} to {1}".format(self.t_start, self.t_stop)
- ]:
- _pp(line)
- def time_slice(self, t_start, t_stop):
- '''
- Creates a new AnalogSignal corresponding to the time slice of the
- original AnalogSignal between times t_start, t_stop. Note, that for
- numerical stability reasons if t_start, t_stop do not fall exactly on
- the time bins defined by the sampling_period they will be rounded to
- the nearest sampling bins.
- '''
- # checking start time and transforming to start index
- if t_start is None:
- i = 0
- else:
- t_start = t_start.rescale(self.sampling_period.units)
- i = (t_start - self.t_start) / self.sampling_period
- i = int(np.rint(i.magnitude))
- # checking stop time and transforming to stop index
- if t_stop is None:
- j = len(self)
- else:
- t_stop = t_stop.rescale(self.sampling_period.units)
- j = (t_stop - self.t_start) / self.sampling_period
- j = int(np.rint(j.magnitude))
- if (i < 0) or (j > len(self)):
- raise ValueError('t_start, t_stop have to be withing the analog \
- signal duration')
- # we're going to send the list of indicies so that we get *copy* of the
- # sliced data
- obj = super(AnalogSignal, self).__getitem__(np.arange(i, j, 1))
- obj.t_start = self.t_start + i * self.sampling_period
- return obj
- def merge(self, other):
- '''
- Merge another :class:`AnalogSignal` into this one.
- The :class:`AnalogSignal` objects are concatenated horizontally
- (column-wise, :func:`np.hstack`).
- If the attributes of the two :class:`AnalogSignal` are not
- compatible, an Exception is raised.
- '''
- if self.sampling_rate != other.sampling_rate:
- raise MergeError("Cannot merge, different sampling rates")
- if self.t_start != other.t_start:
- raise MergeError("Cannot merge, different t_start")
- if self.segment != other.segment:
- raise MergeError("Cannot merge these two signals as they belong to different segments.")
- if hasattr(self, "lazy_shape"):
- if hasattr(other, "lazy_shape"):
- if self.lazy_shape[0] != other.lazy_shape[0]:
- raise MergeError("Cannot merge signals of different length.")
- merged_lazy_shape = (self.lazy_shape[0], self.lazy_shape[1] + other.lazy_shape[1])
- else:
- raise MergeError("Cannot merge a lazy object with a real object.")
- if other.units != self.units:
- other = other.rescale(self.units)
- stack = np.hstack(map(np.array, (self, other)))
- kwargs = {}
- for name in ("name", "description", "file_origin"):
- attr_self = getattr(self, name)
- attr_other = getattr(other, name)
- if attr_self == attr_other:
- kwargs[name] = attr_self
- else:
- kwargs[name] = "merge(%s, %s)" % (attr_self, attr_other)
- merged_annotations = merge_annotations(self.annotations,
- other.annotations)
- kwargs.update(merged_annotations)
- signal = AnalogSignal(stack, units=self.units, dtype=self.dtype,
- copy=False, t_start=self.t_start,
- sampling_rate=self.sampling_rate,
- **kwargs)
- signal.segment = self.segment
- # merge channel_index (move to ChannelIndex.merge()?)
- if self.channel_index and other.channel_index:
- signal.channel_index = ChannelIndex(
- index=np.arange(signal.shape[1]),
- channel_ids=np.hstack([self.channel_index.channel_ids,
- other.channel_index.channel_ids]),
- channel_names=np.hstack([self.channel_index.channel_names,
- other.channel_index.channel_names]))
- else:
- signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]))
- if hasattr(self, "lazy_shape"):
- signal.lazy_shape = merged_lazy_shape
- return signal
- def as_array(self, units=None):
- """
- Return the signal as a plain NumPy array.
- If `units` is specified, first rescale to those units.
- """
- if units:
- return self.rescale(units).magnitude
- else:
- return self.magnitude
- def as_quantity(self):
- """
- Return the signal as a quantities array.
- """
- return self.view(pq.Quantity)
|