123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- """
- ExampleRawIO is a class of a fake example.
- This is to be used when coding a new RawIO.
- Rules for creating a new class:
- 1. Step 1: Create the main class
- * Create a file in **neo/rawio/** that endith with "rawio.py"
- * Create the class that inherits BaseRawIO
- * copy/paste all methods that need to be implemented.
- See the end a neo.rawio.baserawio.BaseRawIO
- * code hard! The main difficulty **is _parse_header()**.
- In short you have a create a mandatory dict than
- contains channel informations::
- self.header = {}
- self.header['nb_block'] = 2
- self.header['nb_segment'] = [2, 3]
- self.header['signal_channels'] = sig_channels
- self.header['unit_channels'] = unit_channels
- self.header['event_channels'] = event_channels
- 2. Step 2: RawIO test:
- * create a file in neo/rawio/tests with the same name with "test_" prefix
- * copy paste neo/rawio/tests/test_examplerawio.py and do the same
- 3. Step 3 : Create the neo.io class with the wrapper
- * Create a file in neo/io/ that endith with "io.py"
- * Create a that inherits both your RawIO class and BaseFromRaw class
- * copy/paste from neo/io/exampleio.py
- 4.Step 4 : IO test
- * create a file in neo/test/iotest with the same previous name with "test_" prefix
- * copy/paste from neo/test/iotest/test_exampleio.py
- """
- from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
- _event_channel_dtype)
- import numpy as np
- class ExampleRawIO(BaseRawIO):
- """
- Class for "reading" fake data from an imaginary file.
- For the user, it give acces to raw data (signals, event, spikes) as they
- are in the (fake) file int16 and int64.
- For a developer, it is just an example showing guidelines for someone who wants
- to develop a new IO module.
- Two rules for developers:
- * Respect the :ref:`neo_rawio_API`
- * Follow the :ref:`io_guiline`
- This fake IO:
- * have 2 blocks
- * blocks have 2 and 3 segments
- * have 16 signal_channel sample_rate = 10000
- * have 3 unit_channel
- * have 2 event channel: one have *type=event*, the other have
- *type=epoch*
- Usage:
- >>> import neo.rawio
- >>> r = neo.rawio.ExampleRawIO(filename='itisafake.nof')
- >>> r.parse_header()
- >>> print(r)
- >>> raw_chunk = r.get_analogsignal_chunk(block_index=0, seg_index=0,
- i_start=0, i_stop=1024, channel_names=channel_names)
- >>> float_chunk = reader.rescale_signal_raw_to_float(raw_chunk, dtype='float64',
- channel_indexes=[0, 3, 6])
- >>> spike_timestamp = reader.spike_timestamps(unit_index=0, t_start=None, t_stop=None)
- >>> spike_times = reader.rescale_spike_timestamp(spike_timestamp, 'float64')
- >>> ev_timestamps, _, ev_labels = reader.event_timestamps(event_channel_index=0)
- """
- extensions = ['fake']
- rawmode = 'one-file'
- def __init__(self, filename=''):
- BaseRawIO.__init__(self)
- # note that this filename is ued in self._source_name
- self.filename = filename
- def _source_name(self):
- # this function is used by __repr__
- # for general cases self.filename is good
- # But for URL you could mask some part of the URL to keep
- # the main part.
- return self.filename
- def _parse_header(self):
- # This is the central of a RawIO
- # we need to collect in the original format all
- # informations needed for further fast acces
- # at any place in the file
- # In short _parse_header can be slow but
- # _get_analogsignal_chunk need to be as fast as possible
- # create signals channels information
- # This is mandatory!!!!
- # gain/offset/units are really important because
- # the scaling to real value will be done with that
- # at the end real_signal = (raw_signal* gain + offset) * pq.Quantity(units)
- sig_channels = []
- for c in range(16):
- ch_name = 'ch{}'.format(c)
- # our channel id is c+1 just for fun
- # Note that chan_id should be realated to
- # original channel id in the file format
- # so that the end user should not be lost when reading datasets
- chan_id = c + 1
- sr = 10000. # Hz
- dtype = 'int16'
- units = 'uV'
- gain = 1000. / 2 ** 16
- offset = 0.
- # group_id isonly for special cases when channel have diferents
- # sampling rate for instance. See TdtIO for that.
- # Here this is the general case :all channel have the same characteritics
- group_id = 0
- sig_channels.append((ch_name, chan_id, sr, dtype, units, gain, offset, group_id))
- sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
- # creating units channels
- # This is mandatory!!!!
- # Note that if there is no waveform at all in the file
- # then wf_units/wf_gain/wf_offset/wf_left_sweep/wf_sampling_rate
- # can be set to any value because _spike_raw_waveforms
- # will return None
- unit_channels = []
- for c in range(3):
- unit_name = 'unit{}'.format(c)
- unit_id = '#{}'.format(c)
- wf_units = 'uV'
- wf_gain = 1000. / 2 ** 16
- wf_offset = 0.
- wf_left_sweep = 20
- wf_sampling_rate = 10000.
- unit_channels.append((unit_name, unit_id, wf_units, wf_gain,
- wf_offset, wf_left_sweep, wf_sampling_rate))
- unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
- # creating event/epoch channel
- # This is mandatory!!!!
- # In RawIO epoch and event they are dealt the same way.
- event_channels = []
- event_channels.append(('Some events', 'ev_0', 'event'))
- event_channels.append(('Some epochs', 'ep_1', 'epoch'))
- event_channels = np.array(event_channels, dtype=_event_channel_dtype)
- # fille into header dict
- # This is mandatory!!!!!
- self.header = {}
- self.header['nb_block'] = 2
- self.header['nb_segment'] = [2, 3]
- self.header['signal_channels'] = sig_channels
- self.header['unit_channels'] = unit_channels
- self.header['event_channels'] = event_channels
- # insert some annotation at some place
- # at neo.io level IO are free to add some annoations
- # to any object. To keep this functionality with the wrapper
- # BaseFromRaw you can add annoations in a nested dict.
- self._generate_minimal_annotations()
- # If you are a lazy dev you can stop here.
- for block_index in range(2):
- bl_ann = self.raw_annotations['blocks'][block_index]
- bl_ann['name'] = 'Block #{}'.format(block_index)
- bl_ann['block_extra_info'] = 'This is the block {}'.format(block_index)
- for seg_index in range([2, 3][block_index]):
- seg_ann = bl_ann['segments'][seg_index]
- seg_ann['name'] = 'Seg #{} Block #{}'.format(
- seg_index, block_index)
- seg_ann['seg_extra_info'] = 'This is the seg {} of block {}'.format(
- seg_index, block_index)
- for c in range(16):
- anasig_an = seg_ann['signals'][c]
- anasig_an['info'] = 'This is a good signals'
- for c in range(3):
- spiketrain_an = seg_ann['units'][c]
- spiketrain_an['quality'] = 'Good!!'
- for c in range(2):
- event_an = seg_ann['events'][c]
- if c == 0:
- event_an['nickname'] = 'Miss Event 0'
- elif c == 1:
- event_an['nickname'] = 'MrEpoch 1'
- def _segment_t_start(self, block_index, seg_index):
- # this must return an float scale in second
- # this t_start will be shared by all object in the segment
- # except AnalogSignal
- all_starts = [[0., 15.], [0., 20., 60.]]
- return all_starts[block_index][seg_index]
- def _segment_t_stop(self, block_index, seg_index):
- # this must return an float scale in second
- all_stops = [[10., 25.], [10., 30., 70.]]
- return all_stops[block_index][seg_index]
- def _get_signal_size(self, block_index, seg_index, channel_indexes=None):
- # we are lucky: signals in all segment have the same shape!! (10.0 seconds)
- # it is not always the case
- # this must return an int = the number of sample
- # Note that channel_indexes can be ignored for most cases
- # except for several sampling rate.
- return 100000
- def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
- # This give the t_start of signals.
- # Very often this equal to _segment_t_start but not
- # always.
- # this must return an float scale in second
- # Note that channel_indexes can be ignored for most cases
- # except for several sampling rate.
- # Here this is the same.
- # this is not always the case
- return self._segment_t_start(block_index, seg_index)
- def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
- # this must return a signal chunk limited with
- # i_start/i_stop (can be None)
- # channel_indexes can be None (=all channel) or a list or numpy.array
- # This must return a numpy array 2D (even with one channel).
- # This must return the orignal dtype. No conversion here.
- # This must as fast as possible.
- # Everything that can be done in _parse_header() must not be here.
- # Here we are lucky: our signals is always zeros!!
- # it is not always the case
- # internally signals are int16
- # convertion to real units is done with self.header['signal_channels']
- if i_start is None:
- i_start = 0
- if i_stop is None:
- i_stop = 100000
- assert i_start >= 0, "I don't like your jokes"
- assert i_stop <= 100000, "I don't like your jokes"
- if channel_indexes is None:
- nb_chan = 16
- else:
- nb_chan = len(channel_indexes)
- raw_signals = np.zeros((i_stop - i_start, nb_chan), dtype='int16')
- return raw_signals
- def _spike_count(self, block_index, seg_index, unit_index):
- # Must return the nb of spike for given (block_index, seg_index, unit_index)
- # we are lucky: our units have all the same nb of spikes!!
- # it is not always the case
- nb_spikes = 20
- return nb_spikes
- def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop):
- # In our IO, timstamp are internally coded 'int64' and they
- # represent the index of the signals 10kHz
- # we are lucky: spikes have the same discharge in all segments!!
- # incredible neuron!! This is not always the case
- # the same clip t_start/t_start must be used in _spike_raw_waveforms()
- ts_start = (self._segment_t_start(block_index, seg_index) * 10000)
- spike_timestamps = np.arange(0, 10000, 500) + ts_start
- if t_start is not None or t_stop is not None:
- # restricte spikes to given limits (in seconds)
- lim0 = int(t_start * 10000)
- lim1 = int(t_stop * 10000)
- mask = (spike_timestamps >= lim0) & (spike_timestamps <= lim1)
- spike_timestamps = spike_timestamps[mask]
- return spike_timestamps
- def _rescale_spike_timestamp(self, spike_timestamps, dtype):
- # must rescale to second a particular spike_timestamps
- # with a fixed dtype so the user can choose the precisino he want.
- spike_times = spike_timestamps.astype(dtype)
- spike_times /= 10000. # because 10kHz
- return spike_times
- def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, t_start, t_stop):
- # this must return a 3D numpy array (nb_spike, nb_channel, nb_sample)
- # in the original dtype
- # this must be as fast as possible.
- # the same clip t_start/t_start must be used in _spike_timestamps()
- # If there there is no waveform supported in the
- # IO them _spike_raw_waveforms must return None
- # In our IO waveforms come from all channels
- # they are int16
- # convertion to real units is done with self.header['unit_channels']
- # Here, we have a realistic case: all waveforms are only noise.
- # it is not always the case
- # we 20 spikes with a sweep of 50 (5ms)
- # trick to get how many spike in the slice
- ts = self._get_spike_timestamps(block_index, seg_index, unit_index, t_start, t_stop)
- nb_spike = ts.size
- np.random.seed(2205) # a magic number (my birthday)
- waveforms = np.random.randint(low=-2**4, high=2**4, size=nb_spike * 50, dtype='int16')
- waveforms = waveforms.reshape(nb_spike, 1, 50)
- return waveforms
- def _event_count(self, block_index, seg_index, event_channel_index):
- # event and spike are very similar
- # we have 2 event channels
- if event_channel_index == 0:
- # event channel
- return 6
- elif event_channel_index == 1:
- # epoch channel
- return 10
- def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
- # the main difference between spike channel and event channel
- # is that for here we have 3 numpy array timestamp, durations, labels
- # durations must be None for 'event'
- # label must a dtype ='U'
- # in our IO event are directly coded in seconds
- seg_t_start = self._segment_t_start(block_index, seg_index)
- if event_channel_index == 0:
- timestamp = np.arange(0, 6, dtype='float64') + seg_t_start
- durations = None
- labels = np.array(['trigger_a', 'trigger_b'] * 3, dtype='U12')
- elif event_channel_index == 1:
- timestamp = np.arange(0, 10, dtype='float64') + .5 + seg_t_start
- durations = np.ones((10), dtype='float64') * .25
- labels = np.array(['zoneX'] * 5 + ['zoneZ'] * 5, dtype='U12')
- if t_start is not None:
- keep = timestamp >= t_start
- timestamp, labels = timestamp[keep], labels[keep]
- if durations is not None:
- durations = durations[keep]
- if t_stop is not None:
- keep = timestamp <= t_stop
- timestamp, labels = timestamp[keep], labels[keep]
- if durations is not None:
- durations = durations[keep]
- return timestamp, durations, labels
- def _rescale_event_timestamp(self, event_timestamps, dtype):
- # must rescale to second a particular event_timestamps
- # with a fixed dtype so the user can choose the precisino he want.
- # really easy here because in our case it is already seconds
- event_times = event_timestamps.astype(dtype)
- return event_times
- def _rescale_epoch_duration(self, raw_duration, dtype):
- # really easy here because in our case it is already seconds
- durations = raw_duration.astype(dtype)
- return durations
|