123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- # -*- coding: utf-8 -*-
- """
- Class for reading data in a raw binary interleaved compact file.
- Sampling rate, units, number of channel and dtype must be externally known.
- This generic format is quite widely used in old acquisition systems
- and is quite universal for sharing data.
- The write part of this IO is only available at neo.io level with the other
- class RawBinarySignalIO
- Important release note:
- * Since the version neo 0.6.0 and the neo.rawio API,
- argmuents of the IO (dtype, nb_channel, sampling_rate) must be
- given at the __init__ and not at read_segment() because there is
- no read_segment() in neo.rawio classes.
- Author: Samuel Garcia
- """
- from __future__ import unicode_literals, print_function, division, absolute_import
- from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
- _event_channel_dtype)
- import numpy as np
- import os
- import sys
- class RawBinarySignalRawIO(BaseRawIO):
- extensions = ['raw', '*']
- rawmode = 'one-file'
- def __init__(self, filename='', dtype='int16', sampling_rate=10000.,
- nb_channel=2, signal_gain=1., signal_offset=0., bytesoffset=0):
- BaseRawIO.__init__(self)
- self.filename = filename
- self.dtype = dtype
- self.sampling_rate = sampling_rate
- self.nb_channel = nb_channel
- self.signal_gain = signal_gain
- self.signal_offset = signal_offset
- self.bytesoffset = bytesoffset
- def _source_name(self):
- return self.filename
- def _parse_header(self):
- if os.path.exists(self.filename):
- self._raw_signals = np.memmap(self.filename, dtype=self.dtype, mode='r',
- offset=self.bytesoffset).reshape(-1, self.nb_channel)
- else:
- # The the neo.io.RawBinarySignalIO is used for write_segment
- self._raw_signals = None
- sig_channels = []
- if self._raw_signals is not None:
- for c in range(self.nb_channel):
- name = 'ch{}'.format(c)
- chan_id = c
- units = ''
- group_id = 0
- sig_channels.append((name, chan_id, self.sampling_rate, self.dtype,
- units, self.signal_gain, self.signal_offset, group_id))
- sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
- # No events
- event_channels = []
- event_channels = np.array(event_channels, dtype=_event_channel_dtype)
- # No spikes
- unit_channels = []
- unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
- # fille into header dict
- self.header = {}
- self.header['nb_block'] = 1
- self.header['nb_segment'] = [1]
- self.header['signal_channels'] = sig_channels
- self.header['unit_channels'] = unit_channels
- self.header['event_channels'] = event_channels
- # insert some annotation at some place
- self._generate_minimal_annotations()
- def _segment_t_start(self, block_index, seg_index):
- return 0.
- def _segment_t_stop(self, block_index, seg_index):
- t_stop = self._raw_signals.shape[0] / self.sampling_rate
- return t_stop
- def _get_signal_size(self, block_index, seg_index, channel_indexes):
- return self._raw_signals.shape[0]
- def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
- return 0.
- def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
- if channel_indexes is None:
- channel_indexes = slice(None)
- raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
- return raw_signals
|