123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530 |
- """
- Class for reading from Brainware SRC files
- SRC files are binary files for holding spike data. They are broken up into
- nested data sequences of different types, with each type of sequence identified
- by a unique ID number. This allows new versions of sequences to be included
- without breaking backwards compatibility, since new versions can just be given
- a new ID number.
- The ID numbers and the format of the data they contain were taken from the
- Matlab-based reader function supplied with BrainWare. The python code,
- however, was implemented from scratch in Python using Python idioms.
- There are some situations where BrainWare data can overflow the SRC file,
- resulting in a corrupt file. Neither BrainWare nor the Matlab-based
- reader can read such files. This software, however, will try to recover
- the data, and in most cases can do so successfully.
- Each SRC file can hold the equivalent of multiple Neo Blocks.
- Brainware was developed by Dr. Jan Schnupp and is availabe from
- Tucker Davis Technologies, Inc.
- http://www.tdt.com/downloads.htm
- Neither Dr. Jan Schnupp nor Tucker Davis Technologies, Inc. had any part in the
- development of this code
- The code is implemented with the permission of Dr. Jan Schnupp
- Note when porting ChannelIndex/Unit to Group (Samuel Garcia).
- The ChannelIndex was used as group of units.
- To avoid now a "group of group" each units is directly a "Group"'.
- Author: Todd Jennings
- """
- # import needed core python modules
- from datetime import datetime, timedelta
- from itertools import chain
- import logging
- import os.path
- # numpy and quantities are already required by neo
- import numpy as np
- import quantities as pq
- # needed core neo modules
- from neo.core import (Block, Event,
- Group, Segment, SpikeTrain, Unit)
- # need to subclass BaseIO
- from neo.io.baseio import BaseIO
- LOGHANDLER = logging.StreamHandler()
- class BrainwareSrcIO(BaseIO):
- """
- Class for reading Brainware Spike ReCord files with the extension '.src'
- The read_block method returns the first Block of the file. It will
- automatically close the file after reading.
- The read method is the same as read_block.
- The read_all_blocks method automatically reads all Blocks. It will
- automatically close the file after reading.
- The read_next_block method will return one Block each time it is called.
- It will automatically close the file and reset to the first Block
- after reading the last block.
- Call the close method to close the file and reset this method
- back to the first Block.
- The _isopen property tells whether the file is currently open and
- reading or closed.
- Note 1:
- The first Unit in each Group is always
- UnassignedSpikes, which has a SpikeTrain for each Segment containing
- all the spikes not assigned to any Unit in that Segment.
- Note 2:
- The first Segment in each Block is always Comments, which stores all
- comments as an Event object.
- Note 3:
- The parameters from the BrainWare table for each condition are stored
- in the Segment annotations. If there are multiple repetitions of
- a condition, each repetition is stored as a separate Segment.
- Note 4:
- There is always only one Group.
- Usage:
- >>> from neo.io.brainwaresrcio import BrainwareSrcIO
- >>> srcfile = BrainwareSrcIO(filename='multi_500ms_mulitrep_ch1.src')
- >>> blk1 = srcfile.read()
- >>> blk2 = srcfile.read_block()
- >>> blks = srcfile.read_all_blocks()
- >>> print blk1.segments
- >>> print blk1.segments[0].spiketrains
- >>> print blk1.groups
- >>> print blk1.groups[0].name
- >>> print blk2
- >>> print blk2[0].segments
- >>> print blks
- >>> print blks[0].segments
- """
- is_readable = True # This class can only read data
- is_writable = False # write is not supported
- # This class is able to directly or indirectly handle the following objects
- supported_objects = [Block, Group,
- Segment, SpikeTrain, Event]
- readable_objects = [Block]
- writeable_objects = []
- has_header = False
- is_streameable = False
- # This is for GUI stuff: a definition for parameters when reading.
- # This dict should be keyed by object (`Block`). Each entry is a list
- # of tuple. The first entry in each tuple is the parameter name. The
- # second entry is a dict with keys 'value' (for default value),
- # and 'label' (for a descriptive name).
- # Note that if the highest-level object requires parameters,
- # common_io_test will be skipped.
- read_params = {Block: []}
- # does not support write so no GUI stuff
- write_params = None
- name = 'Brainware SRC File'
- extensions = ['src']
- mode = 'file'
- def __init__(self, filename=None):
- """
- Arguments:
- filename: the filename
- """
- BaseIO.__init__(self)
- # log the __init__
- self.logger.info('__init__')
- # this stores the filename of the current object, exactly as it is
- # provided when the instance is initialized.
- self._filename = filename
- # this store the filename without the path
- self._file_origin = filename
- # This stores the file object for the current file
- self._fsrc = None
- # This stores the current Block
- self._blk = None
- # This stores the current Segment for easy access
- # It is equivalent to self._blk.segments[-1]
- self._seg0 = None
- # this stores a dictionary of the Block's Group (Units) by name,
- # making it easier and faster to retrieve Units by name later
- # UnassignedSpikes and Units accessed by index are not stored here
- self._unitdict = {}
- # this stores the current Unit
- self._unit0 = None
- # if the file has a list with negative length, the rest of the file's
- # list lengths are unreliable, so we need to store this value for the
- # whole file
- self._damaged = False
- # this stores an empty SpikeTrain which is used in various places.
- self._default_spiketrain = None
- @property
- def _isopen(self):
- """
- This property tells whether the SRC file associated with the IO object
- is open.
- """
- return self._fsrc is not None
- def _opensrc(self):
- """
- Open the file if it isn't already open.
- """
- # if the file isn't already open, open it and clear the Blocks
- if not self._fsrc or self._fsrc.closed:
- self._fsrc = open(self._filename, 'rb')
- # figure out the filename of the current file
- self._file_origin = os.path.basename(self._filename)
- def close(self):
- """
- Close the currently-open file and reset the current reading point.
- """
- self.logger.info('close')
- if self._isopen and not self._fsrc.closed:
- self._fsrc.close()
- # we also need to reset all per-file attributes
- self._damaged = False
- self._fsrc = None
- self._seg0 = None
- self._file_origin = None
- self._lazy = False
- self._default_spiketrain = None
- def read_block(self, lazy=False, **kargs):
- """
- Reads the first Block from the Spike ReCording file "filename"
- generated with BrainWare.
- If you wish to read more than one Block, please use read_all_blocks.
- """
- assert not lazy, 'Do not support lazy'
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'arguments implemented yet')
- blockobj = self.read_next_block()
- self.close()
- return blockobj
- def read_next_block(self, **kargs):
- """
- Reads a single Block from the Spike ReCording file "filename"
- generated with BrainWare.
- Each call of read will return the next Block until all Blocks are
- loaded. After the last Block, the file will be automatically closed
- and the progress reset. Call the close method manually to reset
- back to the first Block.
- """
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'arguments implemented yet')
- self._opensrc()
- # create _default_spiketrain here for performance reasons
- self._default_spiketrain = self._init_default_spiketrain.copy()
- self._default_spiketrain.file_origin = self._file_origin
- # create the Block and the contents all Blocks of from IO share
- self._blk = Block(file_origin=self._file_origin)
- self._seg0 = Segment(name='Comments', file_origin=self._file_origin)
- self._unit0 = Group(name='UnassignedSpikes',
- elliptic=[], boundaries=[],
- timestamp=[], max_valid=[])
- self._blk.groups.append(self._unit0)
- self._blk.segments.append(self._seg0)
- # this actually reads the contents of the Block
- result = []
- while hasattr(result, '__iter__'):
- try:
- result = self._read_by_id()
- except:
- self.close()
- raise
- # since we read at a Block level we always do this
- self._blk.create_many_to_one_relationship()
- # put the Block in a local object so it can be gargabe collected
- blockobj = self._blk
- # reset the per-Block attributes
- self._blk = None
- self._unitdict = {}
- # combine the comments into one big event
- self._combine_segment_events(self._seg0)
- # result is None iff the end of the file is reached, so we can
- # close the file
- # this notification is not helpful if using the read method with
- # cascading, since the user will know it is done when the method
- # returns a value
- if result is None:
- self.logger.info('Last Block read. Closing file.')
- self.close()
- return blockobj
- def read_all_blocks(self, lazy=False, **kargs):
- """
- Reads all Blocks from the Spike ReCording file "filename"
- generated with BrainWare.
- The progress in the file is reset and the file closed then opened again
- prior to reading.
- The file is automatically closed after reading completes.
- """
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- assert not lazy, 'Do not support lazy'
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'argument implemented yet')
- self.close()
- self._opensrc()
- # Read each Block.
- # After the last Block self._isopen is set to False, so this make a
- # good way to determine when to stop
- blocks = []
- while self._isopen:
- try:
- blocks.append(self.read_next_block())
- except:
- self.close()
- raise
- return blocks
- def _convert_timestamp(self, timestamp, start_date=datetime(1899, 12, 30)):
- """
- _convert_timestamp(timestamp, start_date) - convert a timestamp in
- brainware src file units to a python datetime object.
- start_date defaults to 1899.12.30 (ISO format), which is the start date
- used by all BrainWare SRC data Blocks so far. If manually specified
- it should be a datetime object or any other object that can be added
- to a timedelta object.
- """
- # datetime + timedelta = datetime again.
- try:
- timestamp = convert_brainwaresrc_timestamp(timestamp, start_date)
- except OverflowError as err:
- timestamp = start_date
- self.logger.exception('_convert_timestamp overflow')
- return timestamp
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # All methods from here on are private. They are not intended to be used
- # on their own, although methods that could theoretically be called on
- # their own are marked as such. All private methods could be renamed,
- # combined, or split at any time. All private methods prefixed by
- # "__read" or "__skip" will alter the current place in the file.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def _read_by_id(self):
- """
- Reader for generic data
- BrainWare SRC files are broken up into data sequences that are
- identified by an ID code. This method determines the ID code and calls
- the method to read the data sequence with that ID code. See the
- _ID_DICT attribute for a dictionary of code/method pairs.
- IMPORTANT!!!
- This is the only private method that can be called directly.
- The rest of the private methods can only safely be called by this
- method or by other private methods, since they depend on the
- current position in the file.
- """
- try:
- # uint16 -- the ID code of the next sequence
- seqid = np.fromfile(self._fsrc, dtype=np.uint16, count=1).item()
- except ValueError:
- # return a None if at EOF. Other methods use None to recognize
- # an EOF
- return None
- # using the seqid, get the reader function from the reader dict
- readfunc = self._ID_DICT.get(seqid)
- if readfunc is None:
- if seqid <= 0:
- # return if end-of-sequence ID code. This has to be 0.
- # just calling "return" will return a None which is used as an
- # EOF indicator
- return 0
- else:
- # return a warning if the key is invalid
- # (this is consistent with the official behavior,
- # even the official reference files have invalid keys
- # when using the official reference reader matlab
- # scripts
- self.logger.warning('unknown ID: %s', seqid)
- return []
- try:
- # run the function to get the data
- return readfunc(self)
- except (EOFError, UnicodeDecodeError) as err:
- # return a warning if the EOF is reached in the middle of a method
- self.logger.exception('Premature end of file')
- return None
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # These are helper methods. They don't read from the file, so it
- # won't harm the reading process to call them, but they are only relevant
- # when used in other private methods.
- #
- # These are tuned to the particular needs of this IO class, they are
- # unlikely to work properly if used with another file format.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def _assign_sequence(self, data_obj):
- """
- _assign_sequence(data_obj) - Try to guess where an unknown sequence
- should go based on its class. Warning are issued if this method is
- used since manual reorganization may be needed.
- """
- if isinstance(data_obj, Group):
- self.logger.warning('Unknown Group found, adding to Group list')
- self._blk.groups.append(data_obj)
- if data_obj.name:
- self._unitdict[data_obj.name] = data_obj
- elif isinstance(data_obj, Segment):
- self.logger.warning('Unknown Segment found, '
- 'adding to Segments list')
- self._blk.segments.append(data_obj)
- elif isinstance(data_obj, Event):
- self.logger.warning('Unknown Event found, '
- 'adding to comment Events list')
- self._seg0.events.append(data_obj)
- elif isinstance(data_obj, SpikeTrain):
- self.logger.warning('Unknown SpikeTrain found, '
- 'adding to the UnassignedSpikes Unit')
- self._unit0.spiketrains.append(data_obj)
- elif hasattr(data_obj, '__iter__') and not isinstance(data_obj, str):
- for sub_obj in data_obj:
- self._assign_sequence(sub_obj)
- else:
- if self.logger.isEnabledFor(logging.WARNING):
- self.logger.warning('Unrecognized sequence of type %s found, '
- 'skipping', type(data_obj))
- _default_datetime = datetime(1, 1, 1)
- _default_t_start = pq.Quantity(0., units=pq.ms, dtype=np.float32)
- _init_default_spiketrain = SpikeTrain(times=pq.Quantity([], units=pq.ms,
- dtype=np.float32),
- t_start=pq.Quantity(0, units=pq.ms,
- dtype=np.float32
- ),
- t_stop=pq.Quantity(1, units=pq.ms,
- dtype=np.float32),
- waveforms=pq.Quantity([[[]]],
- dtype=np.int8,
- units=pq.mV),
- dtype=np.float32, copy=False,
- timestamp=_default_datetime,
- respwin=np.array([], dtype=np.int32),
- dama_index=-1,
- trig2=pq.Quantity([], units=pq.ms,
- dtype=np.uint8),
- side='')
- def _combine_events(self, events):
- """
- _combine_events(events) - combine a list of Events
- with single events into one long Event
- """
- if not events:
- event = Event(times=pq.Quantity([], units=pq.s),
- labels=np.array([], dtype='U'),
- senders=np.array([], dtype='S'),
- t_start=0)
- return event
- times = []
- labels = []
- senders = []
- for event in events:
- times.append(event.times.magnitude)
- if event.labels.shape == (1,):
- labels.append(event.labels[0])
- else:
- raise AssertionError("This single event has multiple labels in an array with "
- "shape {} instead of a single label.".
- format(event.labels.shape))
- senders.append(event.annotations['sender'])
- times = np.array(times, dtype=np.float32)
- t_start = times.min()
- times = pq.Quantity(times - t_start, units=pq.d).rescale(pq.s)
- labels = np.array(labels, dtype='U')
- senders = np.array(senders)
- event = Event(times=times, labels=labels,
- t_start=t_start.tolist(), senders=senders)
- return event
- def _combine_segment_events(self, segment):
- """
- _combine_segment_events(segment)
- Combine all Events in a segment.
- """
- event = self._combine_events(segment.events)
- event_t_start = event.annotations.pop('t_start')
- segment.rec_datetime = self._convert_timestamp(event_t_start)
- segment.events = [event]
- event.segment = segment
- def _combine_spiketrains(self, spiketrains):
- """
- _combine_spiketrains(spiketrains) - combine a list of SpikeTrains
- with single spikes into one long SpikeTrain
- """
- if not spiketrains:
- return self._default_spiketrain.copy()
- if hasattr(spiketrains[0], 'waveforms') and len(spiketrains) == 1:
- train = spiketrains[0]
- return train
- if hasattr(spiketrains[0], 't_stop'):
- # workaround for bug in some broken files
- istrain = [hasattr(utrain, 'waveforms') for utrain in spiketrains]
- if not all(istrain):
- goodtrains = [itrain for i, itrain in enumerate(spiketrains)
- if istrain[i]]
- badtrains = [itrain for i, itrain in enumerate(spiketrains)
- if not istrain[i]]
- spiketrains = (goodtrains +
- [self._combine_spiketrains(badtrains)])
- spiketrains = [itrain for itrain in spiketrains if itrain.size > 0]
- if not spiketrains:
- return self._default_spiketrain.copy()
- # get the times of the spiketrains and combine them
- waveforms = [itrain.waveforms for itrain in spiketrains]
- rawtrains = np.array(np.concatenate(spiketrains, axis=1))
- times = pq.Quantity(rawtrains, units=pq.ms, copy=False)
- lens1 = np.array([wave.shape[1] for wave in waveforms])
- lens2 = np.array([wave.shape[2] for wave in waveforms])
- if lens1.max() != lens1.min() or lens2.max() != lens2.min():
- lens1 = lens1.max() - lens1
- lens2 = lens2.max() - lens2
- waveforms = [np.pad(waveform,
- ((0, 0), (0, len1), (0, len2)),
- 'constant')
- for waveform, len1, len2 in zip(waveforms,
- lens1,
- lens2)]
- waveforms = np.concatenate(waveforms, axis=0)
- # extract the trig2 annotation
- trig2 = np.array(np.concatenate([itrain.annotations['trig2'] for
- itrain in spiketrains], axis=1))
- trig2 = pq.Quantity(trig2, units=pq.ms)
- elif hasattr(spiketrains[0], 'units'):
- return self._combine_spiketrains([spiketrains])
- else:
- times, waveforms, trig2 = zip(*spiketrains)
- times = np.concatenate(times, axis=0)
- # get the times of the SpikeTrains and combine them
- times = pq.Quantity(times, units=pq.ms, copy=False)
- # get the waveforms of the SpikeTrains and combine them
- # these should be a 3D array with the first axis being the spike,
- # the second axis being the recording channel (there is only one),
- # and the third axis being the actual waveform
- waveforms = np.concatenate(waveforms, axis=0)
- # extract the trig2 annotation
- trig2 = pq.Quantity(np.hstack(trig2),
- units=pq.ms, copy=False)
- if not times.size:
- return self._default_spiketrain.copy()
- # get the maximum time
- t_stop = times[-1] * 2.
- waveforms = pq.Quantity(waveforms, units=pq.mV, copy=False)
- train = SpikeTrain(times=times, copy=False,
- t_start=self._default_t_start.copy(), t_stop=t_stop,
- file_origin=self._file_origin,
- waveforms=waveforms,
- timestamp=self._default_datetime,
- respwin=np.array([], dtype=np.int32),
- dama_index=-1, trig2=trig2, side='')
- return train
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # IMPORTANT!!!
- # These are private methods implementing the internal reading mechanism.
- # Due to the way BrainWare SRC files are structured, they CANNOT be used
- # on their own. Calling these manually will almost certainly alter your
- # position in the file in an unrecoverable manner, whether they throw
- # an exception or not.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def __read_str(self, numchars=1, utf=None):
- """
- Read a string of a specific length.
- """
- rawstr = np.fromfile(self._fsrc, dtype='S%s' % numchars, count=1).item()
- return rawstr.decode('utf-8')
- def __read_annotations(self):
- """
- Read the stimulus grid properties.
- -------------------------------------------------------------------
- Returns a dictionary containing the parameter names as keys and the
- parameter values as values.
- The returned object must be added to the Block.
- ID: 29109
- """
- # int16 -- number of stimulus parameters
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- if not numelements:
- return {}
- # [data sequence] * numelements -- parameter names
- names = []
- for i in range(numelements):
- # {skip} = byte (char) -- skip one byte
- self._fsrc.seek(1, 1)
- # uint8 -- length of next string
- numchars = np.fromfile(self._fsrc, dtype=np.uint8, count=1).item()
- # if there is no name, make one up
- if not numchars:
- name = 'param%s' % i
- else:
- # char * numchars -- parameter name string
- name = self.__read_str(numchars)
- # if the name is already in there, add a unique number to it
- # so it isn't overwritten
- if name in names:
- name = name + str(i)
- names.append(name)
- # float32 * numelements -- an array of parameter values
- values = np.fromfile(self._fsrc, dtype=np.float32,
- count=numelements)
- # combine the names and values into a dict
- # the dict will be added to the annotations
- annotations = dict(zip(names, values))
- return annotations
- def __read_annotations_old(self):
- """
- Read the stimulus grid properties.
- Returns a dictionary containing the parameter names as keys and the
- parameter values as values.
- ------------------------------------------------
- The returned objects must be added to the Block.
- This reads an old version of the format that does not store paramater
- names, so placeholder names are created instead.
- ID: 29099
- """
- # int16 * 14 -- an array of parameter values
- values = np.fromfile(self._fsrc, dtype=np.int16, count=14)
- # create dummy names and combine them with the values in a dict
- # the dict will be added to the annotations
- params = ['param%s' % i for i in range(len(values))]
- annotations = dict(zip(params, values))
- return annotations
- def __read_comment(self):
- """
- Read a single comment.
- The comment is stored as an Event in Segment 0, which is
- specifically for comments.
- ----------------------
- Returns an empty list.
- The returned object is already added to the Block.
- No ID number: always called from another method
- """
- # float64 -- timestamp (number of days since dec 30th 1899)
- time = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # int16 -- length of next string
- numchars1 = np.fromfile(self._fsrc, dtype=np.int16, count=1).item()
- # char * numchars -- the one who sent the comment
- sender = self.__read_str(numchars1)
- # int16 -- length of next string
- numchars2 = np.fromfile(self._fsrc, dtype=np.int16, count=1).item()
- # char * numchars -- comment text
- text = self.__read_str(numchars2, utf=False)
- comment = Event(times=pq.Quantity(time, units=pq.d), labels=[text],
- sender=sender, file_origin=self._file_origin)
- self._seg0.events.append(comment)
- return []
- def __read_list(self):
- """
- Read a list of arbitrary data sequences
- It only says how many data sequences should be read. These sequences
- are then read by their ID number.
- Note that lists can be nested.
- If there are too many sequences (for instance if there are a large
- number of spikes in a Segment) then a negative number will be returned
- for the number of data sequences to read. In this case the method
- tries to guess. This also means that all future list data sequences
- have unreliable lengths as well.
- -------------------------------------------
- Returns a list of objects.
- Whether these objects need to be added to the Block depends on the
- object in question.
- There are several data sequences that have identical formats but are
- used in different situations. That means this data sequences has
- multiple ID numbers.
- ID: 29082
- ID: 29083
- ID: 29091
- ID: 29093
- """
- # int16 -- number of sequences to read
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # {skip} = bytes * 4 (int16 * 2) -- skip four bytes
- self._fsrc.seek(4, 1)
- if numelements == 0:
- return []
- if not self._damaged and numelements < 0:
- self._damaged = True
- self.logger.error('Negative sequence count %s, file damaged',
- numelements)
- if not self._damaged:
- # read the sequences into a list
- seq_list = [self._read_by_id() for _ in range(numelements)]
- else:
- # read until we get some indication we should stop
- seq_list = []
- # uint16 -- the ID of the next sequence
- seqidinit = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go back to
- # before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- while 1:
- # uint16 -- the ID of the next sequence
- seqid = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go
- # back to before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- # if we come across a new sequence, we are at the end of the
- # list so we should stop
- if seqidinit != seqid:
- break
- # otherwise read the next sequence
- seq_list.append(self._read_by_id())
- return seq_list
- def __read_segment(self):
- """
- Read an individual Segment.
- A Segment contains a dictionary of parameters, the length of the
- recording, a list of Units with their Spikes, and a list of Spikes
- not assigned to any Unit. The unassigned spikes are always stored in
- Unit 0, which is exclusively for storing these spikes.
- -------------------------------------------------
- Returns the Segment object created by the method.
- The returned object is already added to the Block.
- ID: 29106
- """
- # (data_obj) -- the stimulus parameters for this segment
- annotations = self._read_by_id()
- annotations['feature_type'] = -1
- annotations['go_by_closest_unit_center'] = False
- annotations['include_unit_bounds'] = False
- # (data_obj) -- SpikeTrain list of unassigned spikes
- # these go in the first Unit since it is for unassigned spikes
- unassigned_spikes = self._read_by_id()
- self._unit0.spiketrains.extend(unassigned_spikes)
- # read a list of units and grab the second return value, which is the
- # SpikeTrains from this Segment (if we use the Unit we will get all the
- # SpikeTrains from that Unit, resuling in duplicates if we are past
- # the first Segment
- trains = self._read_by_id()
- if not trains:
- if unassigned_spikes:
- # if there are no assigned spikes,
- # just use the unassigned spikes
- trains = zip(unassigned_spikes)
- else:
- # if there are no spiketrains at all,
- # create an empty spike train
- trains = [[self._default_spiketrain.copy()]]
- elif hasattr(trains[0], 'dtype'):
- # workaround for some broken files
- trains = [unassigned_spikes +
- [self._combine_spiketrains([trains])]]
- else:
- # get the second element from each returned value,
- # which is the actual SpikeTrains
- trains = [unassigned_spikes] + [train[1] for train in trains]
- # re-organize by sweeps
- trains = zip(*trains)
- # int32 -- SpikeTrain length in ms
- spiketrainlen = pq.Quantity(np.fromfile(self._fsrc, dtype=np.int32,
- count=1)[0], units=pq.ms, copy=False)
- segments = []
- for train in trains:
- # create the Segment and add everything to it
- segment = Segment(file_origin=self._file_origin,
- **annotations)
- segment.spiketrains = train
- self._blk.segments.append(segment)
- segments.append(segment)
- for itrain in train:
- # use the SpikeTrain length to figure out the stop time
- # t_start is always 0 so we can ignore it
- itrain.t_stop = spiketrainlen
- return segments
- def __read_segment_list(self):
- """
- Read a list of Segments with comments.
- Since comments can occur at any point, whether a recording is happening
- or not, it is impossible to reliably assign them to a specific Segment.
- For this reason they are always assigned to Segment 0, which is
- exclusively used to store comments.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29112
- """
- # uint8 -- number of electrode channels in the Segment
- numchannels = np.fromfile(self._fsrc, dtype=np.uint8, count=1)[0]
- # [list of sequences] -- individual Segments
- segments = self.__read_list()
- while not hasattr(segments[0], 'spiketrains'):
- segments = list(chain(*segments))
- # char -- "side of brain" info
- side = self.__read_str(1)
- # int16 -- number of comments
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # comment_obj * numelements -- comments about the Segments
- # we don't know which Segment specifically, though
- for _ in range(numelements):
- self.__read_comment()
- # store what side of the head we are dealing with
- for segment in segments:
- for spiketrain in segment.spiketrains:
- spiketrain.annotations['side'] = side
- return segments
- def __read_segment_list_v8(self):
- """
- Read a list of Segments with comments.
- This is version 8 of the data sequence.
- This is the same as __read_segment_list_var, but can also contain
- one or more arbitrary sequences. The class makes an attempt to assign
- the sequences when possible, and warns the user when this happens
- (see the _assign_sequence method)
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29117
- """
- # segment_collection_var -- this is based off a segment_collection_var
- segments = self.__read_segment_list_var()
- # uint16 -- the ID of the next sequence
- seqid = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go back to
- # before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- if seqid in self._ID_DICT:
- # if it is a valid seqid, read it and try to figure out where
- # to put it
- self._assign_sequence(self._read_by_id())
- else:
- # otherwise it is a Unit list
- self.__read_unit_list()
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- return segments
- def __read_segment_list_v9(self):
- """
- Read a list of Segments with comments.
- This is version 9 of the data sequence.
- This is the same as __read_segment_list_v8, but contains some
- additional annotations. These annotations are added to the Segment.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29120
- """
- # segment_collection_v8 -- this is based off a segment_collection_v8
- segments = self.__read_segment_list_v8()
- # uint8
- feature_type = np.fromfile(self._fsrc, dtype=np.uint8,
- count=1)[0]
- # uint8
- go_by_closest_unit_center = np.fromfile(self._fsrc, dtype=np.bool8,
- count=1)[0]
- # uint8
- include_unit_bounds = np.fromfile(self._fsrc, dtype=np.bool8,
- count=1)[0]
- # create a dictionary of the annotations
- annotations = {'feature_type': feature_type,
- 'go_by_closest_unit_center': go_by_closest_unit_center,
- 'include_unit_bounds': include_unit_bounds}
- # add the annotations to each Segment
- for segment in segments:
- segment.annotations.update(annotations)
- return segments
- def __read_segment_list_var(self):
- """
- Read a list of Segments with comments.
- This is the same as __read_segment_list, but contains information
- regarding the sampling period. This information is added to the
- SpikeTrains in the Segments.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29114
- """
- # float32 -- DA conversion clock period in microsec
- sampling_period = pq.Quantity(np.fromfile(self._fsrc,
- dtype=np.float32, count=1),
- units=pq.us, copy=False)[0]
- # segment_collection -- this is based off a segment_collection
- segments = self.__read_segment_list()
- # add the sampling period to each SpikeTrain
- for segment in segments:
- for spiketrain in segment.spiketrains:
- spiketrain.sampling_period = sampling_period
- return segments
- def __read_spike_fixed(self, numpts=40):
- """
- Read a spike with a fixed waveform length (40 time bins)
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29079
- """
- # float32 -- spike time stamp in ms since start of SpikeTrain
- time = np.fromfile(self._fsrc, dtype=np.float32, count=1)
- # int8 * 40 -- spike shape -- use numpts for spike_var
- waveform = np.fromfile(self._fsrc, dtype=np.int8,
- count=numpts).reshape(1, 1, numpts)
- # uint8 -- point of return to noise
- trig2 = np.fromfile(self._fsrc, dtype=np.uint8, count=1)
- return time, waveform, trig2
- def __read_spike_fixed_old(self):
- """
- Read a spike with a fixed waveform length (40 time bins)
- This is an old version of the format. The time is stored as ints
- representing 1/25 ms time steps. It has no trigger information.
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29081
- """
- # int32 -- spike time stamp in ms since start of SpikeTrain
- time = np.fromfile(self._fsrc, dtype=np.int32, count=1) / 25.
- time = time.astype(np.float32)
- # int8 * 40 -- spike shape
- # This needs to be a 3D array, one for each channel. BrainWare
- # only ever has a single channel per file.
- waveform = np.fromfile(self._fsrc, dtype=np.int8,
- count=40).reshape(1, 1, 40)
- # create a dummy trig2 value
- trig2 = np.array([-1], dtype=np.uint8)
- return time, waveform, trig2
- def __read_spike_var(self):
- """
- Read a spike with a variable waveform length
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29115
- """
- # uint8 -- number of points in spike shape
- numpts = np.fromfile(self._fsrc, dtype=np.uint8, count=1)[0]
- # spike_fixed is the same as spike_var if you don't read the numpts
- # byte and set numpts = 40
- return self.__read_spike_fixed(numpts)
- def __read_spiketrain_indexed(self):
- """
- Read a SpikeTrain
- This is the same as __read_spiketrain_timestamped except it also
- contains the index of the Segment in the dam file.
- The index is stored as an annotation in the SpikeTrain.
- -------------------------------------------------
- Returns a SpikeTrain object with multiple spikes.
- The returned object must be added to the Block.
- ID: 29121
- """
- # int32 -- index of the analogsignalarray in corresponding .dam file
- dama_index = np.fromfile(self._fsrc, dtype=np.int32,
- count=1)[0]
- # spiketrain_timestamped -- this is based off a spiketrain_timestamped
- spiketrain = self.__read_spiketrain_timestamped()
- # add the property to the dict
- spiketrain.annotations['dama_index'] = dama_index
- return spiketrain
- def __read_spiketrain_timestamped(self):
- """
- Read a SpikeTrain
- This SpikeTrain contains a time stamp for when it was recorded
- The timestamp is stored as an annotation in the SpikeTrain.
- -------------------------------------------------
- Returns a SpikeTrain object with multiple spikes.
- The returned object must be added to the Block.
- ID: 29110
- """
- # float64 -- timeStamp (number of days since dec 30th 1899)
- timestamp = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # convert to datetime object
- timestamp = self._convert_timestamp(timestamp)
- # seq_list -- spike list
- # combine the spikes into a single SpikeTrain
- spiketrain = self._combine_spiketrains(self.__read_list())
- # add the timestamp
- spiketrain.annotations['timestamp'] = timestamp
- return spiketrain
- def __read_unit(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This is the same as __read_unit_unsorted except it also contains
- information on the spike sorting boundaries.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29116
- """
- # same as unsorted Unit
- unit, trains = self.__read_unit_unsorted()
- # float32 * 18 -- Unit boundaries (IEEE 32-bit floats)
- unit.annotations['boundaries'] = [np.fromfile(self._fsrc,
- dtype=np.float32,
- count=18)]
- # uint8 * 9 -- boolean values indicating elliptic feature boundary
- # dimensions
- unit.annotations['elliptic'] = [np.fromfile(self._fsrc,
- dtype=np.uint8,
- count=9)]
- return unit, trains
- def __read_unit_list(self):
- """
- A list of a list of Units
- -----------------------------------------------
- Returns a list of Units modified in the method.
- The returned objects are already added to the Block.
- No ID number: only called by other methods
- """
- # this is used to figure out which Units to return
- maxunit = 1
- # int16 -- number of time slices
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # {sequence} * numelements1 -- the number of lists of Units to read
- for i in range(numelements):
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- # double
- max_valid = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # int16 - the number of Units to read
- numunits = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # update tha maximum Unit so far
- maxunit = max(maxunit, numunits + 1)
- # if there aren't enough Units, create them
- # remember we need to skip the UnassignedSpikes Unit
- if numunits > len(self._blk.groups) + 1:
- for ind1 in range(len(self._blk.groups), numunits + 1):
- unit = Group(name='unit%s' % ind1,
- file_origin=self._file_origin,
- elliptic=[], boundaries=[],
- timestamp=[], max_valid=[])
- self._blk.groups.append(unit)
- # {Block} * numelements -- Units
- for ind1 in range(numunits):
- # get the Unit with the given index
- # remember we need to skip the UnassignedSpikes Unit
- unit = self._blk.groups[ind1 + 1]
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- # int16 -- a multiplier for the elliptic and boundaries
- # properties
- numelements3 = np.fromfile(self._fsrc, dtype=np.int16,
- count=1)[0]
- # uint8 * 10 * numelements3 -- boolean values indicating
- # elliptic feature boundary dimensions
- elliptic = np.fromfile(self._fsrc, dtype=np.uint8,
- count=10 * numelements3)
- # float32 * 20 * numelements3 -- feature boundaries
- boundaries = np.fromfile(self._fsrc, dtype=np.float32,
- count=20 * numelements3)
- unit.annotations['elliptic'].append(elliptic)
- unit.annotations['boundaries'].append(boundaries)
- unit.annotations['max_valid'].append(max_valid)
- return self._blk.groups[1:maxunit]
- def __read_unit_list_timestamped(self):
- """
- A list of a list of Units.
- This is the same as __read_unit_list, except that it also has a
- timestamp. This is added ad an annotation to all Units.
- -----------------------------------------------
- Returns a list of Units modified in the method.
- The returned objects are already added to the Block.
- ID: 29119
- """
- # double -- time zero (number of days since dec 30th 1899)
- timestamp = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # convert to to days since UNIX epoc time:
- timestamp = self._convert_timestamp(timestamp)
- # sorter -- this is based off a sorter
- units = self.__read_unit_list()
- for unit in units:
- unit.annotations['timestamp'].append(timestamp)
- return units
- def __read_unit_old(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This is the same as __read_unit_unsorted except it also contains
- information on the spike sorting boundaries.
- This is an old version of the format that used 48-bit floating-point
- numbers for the boundaries. These cannot easily be read and so are
- skipped.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29107
- """
- # same as Unit
- unit, trains = self.__read_unit_unsorted()
- # bytes * 108 (float48 * 18) -- Unit boundaries (48-bit floating
- # point numbers are not supported so we skip them)
- self._fsrc.seek(108, 1)
- # uint8 * 9 -- boolean values indicating elliptic feature boundary
- # dimensions
- unit.annotations['elliptic'] = np.fromfile(self._fsrc, dtype=np.uint8,
- count=9).tolist()
- return unit, trains
- def __read_unit_unsorted(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This does not contain Unit boundaries.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29084
- """
- # {skip} = bytes * 2 (uint16) -- skip two bytes
- self._fsrc.seek(2, 1)
- # uint16 -- number of characters in next string
- numchars = np.fromfile(self._fsrc, dtype=np.uint16, count=1).item()
- # char * numchars -- ID string of Unit
- name = self.__read_str(numchars)
- # int32 -- SpikeTrain length in ms
- # int32 * 4 -- response and spon period boundaries
- parts = np.fromfile(self._fsrc, dtype=np.int32, count=5)
- t_stop = pq.Quantity(parts[0].astype('float32'),
- units=pq.ms, copy=False)
- respwin = parts[1:]
- # (data_obj) -- list of SpikeTrains
- spikeslists = self._read_by_id()
- # use the Unit if it already exists, otherwise create it
- if name in self._unitdict:
- unit = self._unitdict[name]
- else:
- unit = Group(name=name, file_origin=self._file_origin,
- elliptic=[], boundaries=[], timestamp=[], max_valid=[])
- self._blk.groups.append(unit)
- self._unitdict[name] = unit
- # convert the individual spikes to SpikeTrains and add them to the Unit
- trains = [self._combine_spiketrains(spikes) for spikes in spikeslists]
- unit.spiketrains.extend(trains)
- for train in trains:
- train.t_stop = t_stop.copy()
- train.annotations['respwin'] = respwin.copy()
- return unit, trains
- def __skip_information(self):
- """
- Read an information sequence.
- This is data sequence is skipped both here and in the Matlab reference
- implementation.
- ----------------------
- Returns an empty list
- Nothing is created so nothing is added to the Block.
- ID: 29113
- """
- # {skip} char * 34 -- display information
- self._fsrc.seek(34, 1)
- return []
- def __skip_information_old(self):
- """
- Read an information sequence
- This is data sequence is skipped both here and in the Matlab reference
- implementation
- This is an old version of the format
- ----------------------
- Returns an empty list.
- Nothing is created so nothing is added to the Block.
- ID: 29100
- """
- # {skip} char * 4 -- display information
- self._fsrc.seek(4, 1)
- return []
- # This dictionary maps the numeric data sequence ID codes to the data
- # sequence reading functions.
- #
- # Since functions are first-class objects in Python, the functions returned
- # from this dictionary are directly callable.
- #
- # If new data sequence ID codes are added in the future please add the code
- # here in numeric order and the method above in alphabetical order
- #
- # The naming of any private method may change at any time
- _ID_DICT = {29079: __read_spike_fixed,
- 29081: __read_spike_fixed_old,
- 29082: __read_list,
- 29083: __read_list,
- 29084: __read_unit_unsorted,
- 29091: __read_list,
- 29093: __read_list,
- 29099: __read_annotations_old,
- 29100: __skip_information_old,
- 29106: __read_segment,
- 29107: __read_unit_old,
- 29109: __read_annotations,
- 29110: __read_spiketrain_timestamped,
- 29112: __read_segment_list,
- 29113: __skip_information,
- 29114: __read_segment_list_var,
- 29115: __read_spike_var,
- 29116: __read_unit,
- 29117: __read_segment_list_v8,
- 29119: __read_unit_list_timestamped,
- 29120: __read_segment_list_v9,
- 29121: __read_spiketrain_indexed
- }
- def convert_brainwaresrc_timestamp(timestamp,
- start_date=datetime(1899, 12, 30)):
- """
- convert_brainwaresrc_timestamp(timestamp, start_date) - convert a timestamp
- in brainware src file units to a python datetime object.
- start_date defaults to 1899.12.30 (ISO format), which is the start date
- used by all BrainWare SRC data Blocks so far. If manually specified
- it should be a datetime object or any other object that can be added
- to a timedelta object.
- """
- # datetime + timedelta = datetime again.
- return start_date + timedelta(days=timestamp)
- if __name__ == '__main__':
- # run this when calling the file directly as a benchmark
- from neo.test.iotest.test_brainwaresrcio import FILES_TO_TEST
- from neo.test.iotest.common_io_test import url_for_tests
- from neo.test.iotest.tools import (create_local_temp_dir,
- download_test_file,
- get_test_file_full_path,
- make_all_directories)
- shortname = BrainwareSrcIO.__name__.lower().strip('io')
- local_test_dir = create_local_temp_dir(shortname)
- url = url_for_tests + shortname
- FILES_TO_TEST.remove('long_170s_1rep_1clust_ch2.src')
- make_all_directories(FILES_TO_TEST, local_test_dir)
- download_test_file(FILES_TO_TEST, local_test_dir, url)
- for path in get_test_file_full_path(ioclass=BrainwareSrcIO,
- filename=FILES_TO_TEST,
- directory=local_test_dir):
- ioobj = BrainwareSrcIO(path)
- ioobj.read_all_blocks(lazy=False)
|