123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- '''
- This module implements :class:`BaseSignal`, an array of signals.
- This is a parent class from which all signal objects inherit:
- :class:`AnalogSignal` and :class:`IrregularlySampledSignal`
- :class:`BaseSignal` inherits from :class:`quantities.Quantity`, which
- inherits from :class:`numpy.array`.
- Inheritance from :class:`numpy.array` is explained here:
- http://docs.scipy.org/doc/numpy/user/basics.subclassing.html
- In brief:
- * Constructor :meth:`__new__` for :class:`BaseSignal` doesn't exist.
- Only child objects :class:`AnalogSignal` and :class:`IrregularlySampledSignal`
- can be created.
- '''
- import copy
- import logging
- from copy import deepcopy
- import numpy as np
- import quantities as pq
- from neo.core.baseneo import MergeError, merge_annotations
- from neo.core.dataobject import DataObject, ArrayDict
- from neo.core.channelindex import ChannelIndex
- logger = logging.getLogger("Neo")
- class BaseSignal(DataObject):
- '''
- This is the base class from which all signal objects inherit:
- :class:`AnalogSignal` and :class:`IrregularlySampledSignal`.
- This class contains all common methods of both child classes.
- It uses the following child class attributes:
- :_necessary_attrs: a list of the attributes that the class must have.
- :_recommended_attrs: a list of the attributes that the class may
- optionally have.
- '''
- def _array_finalize_spec(self, obj):
- '''
- Called by :meth:`__array_finalize__`, used to customize behaviour of sub-classes.
- '''
- return obj
- def __array_finalize__(self, obj):
- '''
- This is called every time a new signal is created.
- It is the appropriate place to set default values for attributes
- for a signal constructed by slicing or viewing.
- User-specified values are only relevant for construction from
- constructor, and these are set in __new__ in the child object.
- Then they are just copied over here. Default values for the
- specific attributes for subclasses (:class:`AnalogSignal`
- and :class:`IrregularlySampledSignal`) are set in
- :meth:`_array_finalize_spec`
- '''
- super().__array_finalize__(obj)
- self._array_finalize_spec(obj)
- # The additional arguments
- self.annotations = getattr(obj, 'annotations', {})
- # Add empty array annotations, because they cannot always be copied,
- # but do not overwrite existing ones from slicing etc.
- # This ensures the attribute exists
- if not hasattr(self, 'array_annotations'):
- self.array_annotations = ArrayDict(self._get_arr_ann_length())
- # Globally recommended attributes
- self.name = getattr(obj, 'name', None)
- self.file_origin = getattr(obj, 'file_origin', None)
- self.description = getattr(obj, 'description', None)
- # Parent objects
- self.segment = getattr(obj, 'segment', None)
- self.channel_index = getattr(obj, 'channel_index', None)
- @classmethod
- def _rescale(self, signal, units=None):
- '''
- Check that units are present, and rescale the signal if necessary.
- This is called whenever a new signal is
- created from the constructor. See :meth:`__new__' in
- :class:`AnalogSignal` and :class:`IrregularlySampledSignal`
- '''
- if units is None:
- if not hasattr(signal, "units"):
- raise ValueError("Units must be specified")
- elif isinstance(signal, pq.Quantity):
- # This test always returns True, i.e. rescaling is always executed if one of the units
- # is a pq.CompoundUnit. This is fine because rescaling is correct anyway.
- if pq.quantity.validate_dimensionality(units) != signal.dimensionality:
- signal = signal.rescale(units)
- return signal
- def rescale(self, units):
- obj = super().rescale(units)
- obj.channel_index = self.channel_index
- return obj
- def __getslice__(self, i, j):
- '''
- Get a slice from :attr:`i` to :attr:`j`.attr[0]
- Doesn't get called in Python 3, :meth:`__getitem__` is called instead
- '''
- return self.__getitem__(slice(i, j))
- def __ne__(self, other):
- '''
- Non-equality test (!=)
- '''
- return not self.__eq__(other)
- def _apply_operator(self, other, op, *args):
- '''
- Handle copying metadata to the new signal
- after a mathematical operation.
- '''
- self._check_consistency(other)
- f = getattr(super(), op)
- new_signal = f(other, *args)
- new_signal._copy_data_complement(self)
- # _copy_data_complement can't always copy array annotations,
- # so this needs to be done locally
- new_signal.array_annotations = copy.deepcopy(self.array_annotations)
- return new_signal
- def _get_required_attributes(self, signal, units):
- '''
- Return a list of the required attributes for a signal as a dictionary
- '''
- required_attributes = {}
- for attr in self._necessary_attrs:
- if attr[0] == "signal":
- required_attributes["signal"] = signal
- elif attr[0] == "image_data":
- required_attributes["image_data"] = signal
- elif attr[0] == "t_start":
- required_attributes["t_start"] = getattr(self, "t_start", 0.0 * pq.ms)
- else:
- required_attributes[str(attr[0])] = getattr(self, attr[0], None)
- required_attributes['units'] = units
- return required_attributes
- def duplicate_with_new_data(self, signal, units=None):
- '''
- Create a new signal with the same metadata but different data.
- Required attributes of the signal are used.
- Note: Array annotations can not be copied here because length of data can change
- '''
- if units is None:
- units = self.units
- # else:
- # units = pq.quantity.validate_dimensionality(units)
- # signal is the new signal
- required_attributes = self._get_required_attributes(signal, units)
- new = self.__class__(**required_attributes)
- new._copy_data_complement(self)
- new.annotations.update(self.annotations)
- # Note: Array annotations are not copied here, because it is not ensured
- # that the same number of signals is used and they would possibly make no sense
- # when combined with another signal
- return new
- def _copy_data_complement(self, other):
- '''
- Copy the metadata from another signal.
- Required and recommended attributes of the signal are used.
- Note: Array annotations can not be copied here because length of data can change
- '''
- all_attr = {self._recommended_attrs, self._necessary_attrs}
- for sub_at in all_attr:
- for attr in sub_at:
- if attr[0] == "t_start":
- setattr(self, attr[0], deepcopy(getattr(other, attr[0], 0.0 * pq.ms)))
- elif attr[0] != 'signal':
- setattr(self, attr[0], deepcopy(getattr(other, attr[0], None)))
- setattr(self, 'annotations', deepcopy(getattr(other, 'annotations', None)))
- # Note: Array annotations cannot be copied because length of data can be changed # here
- # which would cause inconsistencies
- def __rsub__(self, other, *args):
- '''
- Backwards subtraction (other-self)
- '''
- return self.__mul__(-1, *args) + other
- def __add__(self, other, *args):
- '''
- Addition (+)
- '''
- return self._apply_operator(other, "__add__", *args)
- def __sub__(self, other, *args):
- '''
- Subtraction (-)
- '''
- return self._apply_operator(other, "__sub__", *args)
- def __mul__(self, other, *args):
- '''
- Multiplication (*)
- '''
- return self._apply_operator(other, "__mul__", *args)
- def __truediv__(self, other, *args):
- '''
- Float division (/)
- '''
- return self._apply_operator(other, "__truediv__", *args)
- def __div__(self, other, *args):
- '''
- Integer division (//)
- '''
- return self._apply_operator(other, "__div__", *args)
- __radd__ = __add__
- __rmul__ = __sub__
- def merge(self, other):
- '''
- Merge another signal into this one.
- The signal objects are concatenated horizontally
- (column-wise, :func:`np.hstack`).
- If the attributes of the two signal are not
- compatible, an Exception is raised.
- Required attributes of the signal are used.
- '''
- for attr in self._necessary_attrs:
- if 'signal' != attr[0]:
- if getattr(self, attr[0], None) != getattr(other, attr[0], None):
- raise MergeError("Cannot merge these two signals as the %s differ." % attr[0])
- if self.segment != other.segment:
- raise MergeError(
- "Cannot merge these two signals as they belong to different segments.")
- if hasattr(self, "lazy_shape"):
- if hasattr(other, "lazy_shape"):
- if self.lazy_shape[0] != other.lazy_shape[0]:
- raise MergeError("Cannot merge signals of different length.")
- merged_lazy_shape = (self.lazy_shape[0], self.lazy_shape[1] + other.lazy_shape[1])
- else:
- raise MergeError("Cannot merge a lazy object with a real object.")
- if other.units != self.units:
- other = other.rescale(self.units)
- stack = np.hstack((self.magnitude, other.magnitude))
- kwargs = {}
- for name in ("name", "description", "file_origin"):
- attr_self = getattr(self, name)
- attr_other = getattr(other, name)
- if attr_self == attr_other:
- kwargs[name] = attr_self
- else:
- kwargs[name] = "merge({}, {})".format(attr_self, attr_other)
- merged_annotations = merge_annotations(self.annotations, other.annotations)
- kwargs.update(merged_annotations)
- kwargs['array_annotations'] = self._merge_array_annotations(other)
- signal = self.__class__(stack, units=self.units, dtype=self.dtype, copy=False,
- t_start=self.t_start, sampling_rate=self.sampling_rate, **kwargs)
- signal.segment = self.segment
- if hasattr(self, "lazy_shape"):
- signal.lazy_shape = merged_lazy_shape
- # merge channel_index (move to ChannelIndex.merge()?)
- if self.channel_index and other.channel_index:
- signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]),
- channel_ids=np.hstack(
- [self.channel_index.channel_ids,
- other.channel_index.channel_ids]),
- channel_names=np.hstack(
- [self.channel_index.channel_names,
- other.channel_index.channel_names]))
- else:
- signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]))
- return signal
- def time_slice(self, t_start, t_stop):
- '''
- Creates a new AnalogSignal corresponding to the time slice of the
- original Signal between times t_start, t_stop.
- '''
- NotImplementedError('Needs to be implemented for subclasses.')
- def concatenate(self, *signals):
- '''
- Concatenate multiple signals across time.
- The signal objects are concatenated vertically
- (row-wise, :func:`np.vstack`). Concatenation can be
- used to combine signals across segments.
- Note: Only (array) annotations common to
- both signals are attached to the concatenated signal.
- If the attributes of the signals are not
- compatible, an Exception is raised.
- Parameters
- ----------
- signals : multiple neo.BaseSignal objects
- The objects that is concatenated with this one.
- Returns
- -------
- signal : neo.BaseSignal
- Signal containing all non-overlapping samples of
- the source signals.
- Raises
- ------
- MergeError
- If `other` object has incompatible attributes.
- '''
- NotImplementedError('Patching need to be implemented in sublcasses')
|