12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586 |
- # -*- coding: utf-8 -*-
- """
- Class for reading from Brainware SRC files
- SRC files are binary files for holding spike data. They are broken up into
- nested data sequences of different types, with each type of sequence identified
- by a unique ID number. This allows new versions of sequences to be included
- without breaking backwards compatibility, since new versions can just be given
- a new ID number.
- The ID numbers and the format of the data they contain were taken from the
- Matlab-based reader function supplied with BrainWare. The python code,
- however, was implemented from scratch in Python using Python idioms.
- There are some situations where BrainWare data can overflow the SRC file,
- resulting in a corrupt file. Neither BrainWare nor the Matlab-based
- reader can read such files. This software, however, will try to recover
- the data, and in most cases can do so successfully.
- Each SRC file can hold the equivalent of multiple Neo Blocks.
- Brainware was developed by Dr. Jan Schnupp and is availabe from
- Tucker Davis Technologies, Inc.
- http://www.tdt.com/downloads.htm
- Neither Dr. Jan Schnupp nor Tucker Davis Technologies, Inc. had any part in the
- development of this code
- The code is implemented with the permission of Dr. Jan Schnupp
- Author: Todd Jennings
- """
- # needed for python 3 compatibility
- from __future__ import absolute_import, division, print_function
- # import needed core python modules
- from datetime import datetime, timedelta
- from itertools import chain
- import logging
- import os.path
- import sys
- # numpy and quantities are already required by neo
- import numpy as np
- import quantities as pq
- # needed core neo modules
- from neo.core import (Block, Event,
- ChannelIndex, Segment, SpikeTrain, Unit)
- # need to subclass BaseIO
- from neo.io.baseio import BaseIO
- LOGHANDLER = logging.StreamHandler()
- PY_VER = sys.version_info[0]
- class BrainwareSrcIO(BaseIO):
- """
- Class for reading Brainware Spike ReCord files with the extension '.src'
- The read_block method returns the first Block of the file. It will
- automatically close the file after reading.
- The read method is the same as read_block.
- The read_all_blocks method automatically reads all Blocks. It will
- automatically close the file after reading.
- The read_next_block method will return one Block each time it is called.
- It will automatically close the file and reset to the first Block
- after reading the last block.
- Call the close method to close the file and reset this method
- back to the first Block.
- The _isopen property tells whether the file is currently open and
- reading or closed.
- Note 1:
- The first Unit in each ChannelIndex is always
- UnassignedSpikes, which has a SpikeTrain for each Segment containing
- all the spikes not assigned to any Unit in that Segment.
- Note 2:
- The first Segment in each Block is always Comments, which stores all
- comments as an Event object.
- Note 3:
- The parameters from the BrainWare table for each condition are stored
- in the Segment annotations. If there are multiple repetitions of
- a condition, each repetition is stored as a separate Segment.
- Note 4:
- There is always only one ChannelIndex. BrainWare stores the
- equivalent of ChannelIndexes in separate files.
- Usage:
- >>> from neo.io.brainwaresrcio import BrainwareSrcIO
- >>> srcfile = BrainwareSrcIO(filename='multi_500ms_mulitrep_ch1.src')
- >>> blk1 = srcfile.read()
- >>> blk2 = srcfile.read_block()
- >>> blks = srcfile.read_all_blocks()
- >>> print blk1.segments
- >>> print blk1.segments[0].spiketrains
- >>> print blk1.units
- >>> print blk1.units[0].name
- >>> print blk2
- >>> print blk2[0].segments
- >>> print blks
- >>> print blks[0].segments
- """
- is_readable = True # This class can only read data
- is_writable = False # write is not supported
- # This class is able to directly or indirectly handle the following objects
- supported_objects = [Block, ChannelIndex,
- Segment, SpikeTrain, Event, Unit]
- readable_objects = [Block]
- writeable_objects = []
- has_header = False
- is_streameable = False
- # This is for GUI stuff: a definition for parameters when reading.
- # This dict should be keyed by object (`Block`). Each entry is a list
- # of tuple. The first entry in each tuple is the parameter name. The
- # second entry is a dict with keys 'value' (for default value),
- # and 'label' (for a descriptive name).
- # Note that if the highest-level object requires parameters,
- # common_io_test will be skipped.
- read_params = {Block: []}
- # does not support write so no GUI stuff
- write_params = None
- name = 'Brainware SRC File'
- extensions = ['src']
- mode = 'file'
- def __init__(self, filename=None):
- """
- Arguments:
- filename: the filename
- """
- BaseIO.__init__(self)
- # log the __init__
- self.logger.info('__init__')
- # this stores the filename of the current object, exactly as it is
- # provided when the instance is initialized.
- self._filename = filename
- # this store the filename without the path
- self._file_origin = filename
- # This stores the file object for the current file
- self._fsrc = None
- # This stores the current Block
- self._blk = None
- # This stores the current ChannelIndex for easy access
- # It is equivalent to self._blk.channel_indexes[0]
- self._chx = None
- # This stores the current Segment for easy access
- # It is equivalent to self._blk.segments[-1]
- self._seg0 = None
- # this stores a dictionary of the Block's Units by name,
- # making it easier and faster to retrieve Units by name later
- # UnassignedSpikes and Units accessed by index are not stored here
- self._unitdict = {}
- # this stores the current Unit
- self._unit0 = None
- # if the file has a list with negative length, the rest of the file's
- # list lengths are unreliable, so we need to store this value for the
- # whole file
- self._damaged = False
- # this stores whether the current file is lazy loaded
- self._lazy = False
- # this stores whether the current file is cascading
- # this is false by default so if we use read_block on its own it works
- self._cascade = False
- # this stores an empty SpikeTrain which is used in various places.
- self._default_spiketrain = None
- @property
- def _isopen(self):
- """
- This property tells whether the SRC file associated with the IO object
- is open.
- """
- return self._fsrc is not None
- def _opensrc(self):
- """
- Open the file if it isn't already open.
- """
- # if the file isn't already open, open it and clear the Blocks
- if not self._fsrc or self._fsrc.closed:
- self._fsrc = open(self._filename, 'rb')
- # figure out the filename of the current file
- self._file_origin = os.path.basename(self._filename)
- def close(self):
- """
- Close the currently-open file and reset the current reading point.
- """
- self.logger.info('close')
- if self._isopen and not self._fsrc.closed:
- self._fsrc.close()
- # we also need to reset all per-file attributes
- self._damaged = False
- self._fsrc = None
- self._seg0 = None
- self._cascade = False
- self._file_origin = None
- self._lazy = False
- self._default_spiketrain = None
- def read(self, lazy=False, cascade=True, **kargs):
- """
- Reads the first Block from the Spike ReCording file "filename"
- generated with BrainWare.
- If you wish to read more than one Block, please use read_all_blocks.
- """
- return self.read_block(lazy=lazy, cascade=cascade, **kargs)
- def read_block(self, lazy=False, cascade=True, **kargs):
- """
- Reads the first Block from the Spike ReCording file "filename"
- generated with BrainWare.
- If you wish to read more than one Block, please use read_all_blocks.
- """
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'arguments implemented yet')
- blockobj = self.read_next_block(cascade=cascade, lazy=lazy)
- self.close()
- return blockobj
- def read_next_block(self, cascade=True, lazy=False, **kargs):
- """
- Reads a single Block from the Spike ReCording file "filename"
- generated with BrainWare.
- Each call of read will return the next Block until all Blocks are
- loaded. After the last Block, the file will be automatically closed
- and the progress reset. Call the close method manually to reset
- back to the first Block.
- """
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'arguments implemented yet')
- self._lazy = lazy
- self._opensrc()
- # create _default_spiketrain here for performance reasons
- self._default_spiketrain = self._init_default_spiketrain.copy()
- self._default_spiketrain.file_origin = self._file_origin
- if lazy:
- self._default_spiketrain.lazy_shape = (0,)
- # create the Block and the contents all Blocks of from IO share
- self._blk = Block(file_origin=self._file_origin)
- if not cascade:
- return self._blk
- self._chx = ChannelIndex(file_origin=self._file_origin,
- index=np.array([], dtype=np.int))
- self._seg0 = Segment(name='Comments', file_origin=self._file_origin)
- self._unit0 = Unit(name='UnassignedSpikes',
- file_origin=self._file_origin,
- elliptic=[], boundaries=[],
- timestamp=[], max_valid=[])
- self._blk.channel_indexes.append(self._chx)
- self._chx.units.append(self._unit0)
- self._blk.segments.append(self._seg0)
- # this actually reads the contents of the Block
- result = []
- while hasattr(result, '__iter__'):
- try:
- result = self._read_by_id()
- except:
- self.close()
- raise
- # since we read at a Block level we always do this
- self._blk.create_many_to_one_relationship()
- # put the Block in a local object so it can be gargabe collected
- blockobj = self._blk
- # reset the per-Block attributes
- self._blk = None
- self._chx = None
- self._unitdict = {}
- # combine the comments into one big event
- self._combine_segment_events(self._seg0)
- # result is None iff the end of the file is reached, so we can
- # close the file
- # this notification is not helpful if using the read method with
- # cascade==True, since the user will know it is done when the method
- # returns a value
- if result is None:
- self.logger.info('Last Block read. Closing file.')
- self.close()
- return blockobj
- def read_all_blocks(self, cascade=True, lazy=False, **kargs):
- """
- Reads all Blocks from the Spike ReCording file "filename"
- generated with BrainWare.
- The progress in the file is reset and the file closed then opened again
- prior to reading.
- The file is automatically closed after reading completes.
- """
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'argument implemented yet')
- self._lazy = lazy
- self._cascade = True
- self.close()
- self._opensrc()
- # Read each Block.
- # After the last Block self._isopen is set to False, so this make a
- # good way to determine when to stop
- blocks = []
- while self._isopen:
- try:
- blocks.append(self.read_next_block(cascade=cascade,
- lazy=lazy))
- except:
- self.close()
- raise
- return blocks
- def _convert_timestamp(self, timestamp, start_date=datetime(1899, 12, 30)):
- """
- _convert_timestamp(timestamp, start_date) - convert a timestamp in
- brainware src file units to a python datetime object.
- start_date defaults to 1899.12.30 (ISO format), which is the start date
- used by all BrainWare SRC data Blocks so far. If manually specified
- it should be a datetime object or any other object that can be added
- to a timedelta object.
- """
- # datetime + timedelta = datetime again.
- try:
- timestamp = convert_brainwaresrc_timestamp(timestamp, start_date)
- except OverflowError as err:
- timestamp = start_date
- self.logger.exception('_convert_timestamp overflow')
- return timestamp
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # All methods from here on are private. They are not intended to be used
- # on their own, although methods that could theoretically be called on
- # their own are marked as such. All private methods could be renamed,
- # combined, or split at any time. All private methods prefixed by
- # "__read" or "__skip" will alter the current place in the file.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def _read_by_id(self):
- """
- Reader for generic data
- BrainWare SRC files are broken up into data sequences that are
- identified by an ID code. This method determines the ID code and calls
- the method to read the data sequence with that ID code. See the
- _ID_DICT attribute for a dictionary of code/method pairs.
- IMPORTANT!!!
- This is the only private method that can be called directly.
- The rest of the private methods can only safely be called by this
- method or by other private methods, since they depend on the
- current position in the file.
- """
- try:
- # uint16 -- the ID code of the next sequence
- seqid = np.asscalar(np.fromfile(self._fsrc,
- dtype=np.uint16, count=1))
- except ValueError:
- # return a None if at EOF. Other methods use None to recognize
- # an EOF
- return None
- # using the seqid, get the reader function from the reader dict
- readfunc = self._ID_DICT.get(seqid)
- if readfunc is None:
- if seqid <= 0:
- # return if end-of-sequence ID code. This has to be 0.
- # just calling "return" will return a None which is used as an
- # EOF indicator
- return 0
- else:
- # return a warning if the key is invalid
- # (this is consistent with the official behavior,
- # even the official reference files have invalid keys
- # when using the official reference reader matlab
- # scripts
- self.logger.warning('unknown ID: %s', seqid)
- return []
- try:
- # run the function to get the data
- return readfunc(self)
- except (EOFError, UnicodeDecodeError) as err:
- # return a warning if the EOF is reached in the middle of a method
- self.logger.exception('Premature end of file')
- return None
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # These are helper methods. They don't read from the file, so it
- # won't harm the reading process to call them, but they are only relevant
- # when used in other private methods.
- #
- # These are tuned to the particular needs of this IO class, they are
- # unlikely to work properly if used with another file format.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def _assign_sequence(self, data_obj):
- """
- _assign_sequence(data_obj) - Try to guess where an unknown sequence
- should go based on its class. Warning are issued if this method is
- used since manual reorganization may be needed.
- """
- if isinstance(data_obj, Unit):
- self.logger.warning('Unknown Unit found, adding to Units list')
- self._chx.units.append(data_obj)
- if data_obj.name:
- self._unitdict[data_obj.name] = data_obj
- elif isinstance(data_obj, Segment):
- self.logger.warning('Unknown Segment found, '
- 'adding to Segments list')
- self._blk.segments.append(data_obj)
- elif isinstance(data_obj, Event):
- self.logger.warning('Unknown Event found, '
- 'adding to comment Events list')
- self._seg0.events.append(data_obj)
- elif isinstance(data_obj, SpikeTrain):
- self.logger.warning('Unknown SpikeTrain found, '
- 'adding to the UnassignedSpikes Unit')
- self._unit0.spiketrains.append(data_obj)
- elif hasattr(data_obj, '__iter__') and not isinstance(data_obj, str):
- for sub_obj in data_obj:
- self._assign_sequence(sub_obj)
- else:
- if self.logger.isEnabledFor(logging.WARNING):
- self.logger.warning('Unrecognized sequence of type %s found, '
- 'skipping', type(data_obj))
- _default_datetime = datetime(1, 1, 1)
- _default_t_start = pq.Quantity(0., units=pq.ms, dtype=np.float32)
- _init_default_spiketrain = SpikeTrain(times=pq.Quantity([], units=pq.ms,
- dtype=np.float32),
- t_start=pq.Quantity(0, units=pq.ms,
- dtype=np.float32
- ),
- t_stop=pq.Quantity(1, units=pq.ms,
- dtype=np.float32),
- waveforms=pq.Quantity([[[]]],
- dtype=np.int8,
- units=pq.mV),
- dtype=np.float32, copy=False,
- timestamp=_default_datetime,
- respwin=np.array([], dtype=np.int32),
- dama_index=-1,
- trig2=pq.Quantity([], units=pq.ms,
- dtype=np.uint8),
- side='')
- def _combine_events(self, events):
- """
- _combine_events(events) - combine a list of Events
- with single events into one long Event
- """
- if not events or self._lazy:
- event = Event(times=pq.Quantity([], units=pq.s),
- labels=np.array([], dtype='S'),
- senders=np.array([], dtype='S'),
- t_start=0)
- if self._lazy:
- event.lazy_shape = len(events)
- return event
- times = []
- labels = []
- senders = []
- for event in events:
- times.append(event.times.magnitude)
- labels.append(event.labels)
- senders.append(event.annotations['sender'])
- times = np.array(times, dtype=np.float32)
- t_start = times.min()
- times = pq.Quantity(times-t_start, units=pq.d).rescale(pq.s)
- labels = np.array(labels)
- senders = np.array(senders)
- event = Event(times=times, labels=labels,
- t_start=t_start.tolist(), senders=senders)
- return event
- def _combine_segment_events(self, segment):
- """
- _combine_segment_events(segment)
- Combine all Events in a segment.
- """
- event = self._combine_events(segment.events)
- event_t_start = event.annotations.pop('t_start')
- segment.rec_datetime = self._convert_timestamp(event_t_start)
- segment.events = [event]
- event.segment = segment
- def _combine_spiketrains(self, spiketrains):
- """
- _combine_spiketrains(spiketrains) - combine a list of SpikeTrains
- with single spikes into one long SpikeTrain
- """
- if not spiketrains:
- return self._default_spiketrain.copy()
- if hasattr(spiketrains[0], 'waveforms') and len(spiketrains) == 1:
- train = spiketrains[0]
- if self._lazy and not hasattr(train, 'lazy_shape'):
- train.lazy_shape = train.shape
- train = train[:0]
- return train
- if hasattr(spiketrains[0], 't_stop'):
- # workaround for bug in some broken files
- istrain = [hasattr(utrain, 'waveforms') for utrain in spiketrains]
- if not all(istrain):
- goodtrains = [itrain for i, itrain in enumerate(spiketrains)
- if istrain[i]]
- badtrains = [itrain for i, itrain in enumerate(spiketrains)
- if not istrain[i]]
- spiketrains = (goodtrains +
- [self._combine_spiketrains(badtrains)])
- spiketrains = [itrain for itrain in spiketrains if itrain.size > 0]
- if not spiketrains:
- return self._default_spiketrain.copy()
- # get the times of the spiketrains and combine them
- waveforms = [itrain.waveforms for itrain in spiketrains]
- rawtrains = np.array(np.concatenate(spiketrains, axis=1))
- times = pq.Quantity(rawtrains, units=pq.ms, copy=False)
- lens1 = np.array([wave.shape[1] for wave in waveforms])
- lens2 = np.array([wave.shape[2] for wave in waveforms])
- if lens1.max() != lens1.min() or lens2.max() != lens2.min():
- lens1 = lens1.max() - lens1
- lens2 = lens2.max() - lens2
- waveforms = [np.pad(waveform,
- ((0, 0), (0, len1), (0, len2)),
- 'constant')
- for waveform, len1, len2 in zip(waveforms,
- lens1,
- lens2)]
- waveforms = np.concatenate(waveforms, axis=0)
- # extract the trig2 annotation
- trig2 = np.array(np.concatenate([itrain.annotations['trig2'] for
- itrain in spiketrains], axis=1))
- trig2 = pq.Quantity(trig2, units=pq.ms)
- elif hasattr(spiketrains[0], 'units'):
- return self._combine_spiketrains([spiketrains])
- else:
- times, waveforms, trig2 = zip(*spiketrains)
- times = np.concatenate(times, axis=0)
- # get the times of the SpikeTrains and combine them
- times = pq.Quantity(times, units=pq.ms, copy=False)
- # get the waveforms of the SpikeTrains and combine them
- # these should be a 3D array with the first axis being the spike,
- # the second axis being the recording channel (there is only one),
- # and the third axis being the actual waveform
- waveforms = np.concatenate(waveforms, axis=0)
- # extract the trig2 annotation
- trig2 = pq.Quantity(np.hstack(trig2),
- units=pq.ms, copy=False)
- if not times.size:
- return self._default_spiketrain.copy()
- # get the maximum time
- t_stop = times[-1] * 2.
- if self._lazy:
- timesshape = times.shape
- times = pq.Quantity([], units=pq.ms, copy=False)
- waveforms = pq.Quantity([[[]]], units=pq.mV)
- else:
- waveforms = pq.Quantity(waveforms, units=pq.mV, copy=False)
- train = SpikeTrain(times=times, copy=False,
- t_start=self._default_t_start.copy(), t_stop=t_stop,
- file_origin=self._file_origin,
- waveforms=waveforms,
- timestamp=self._default_datetime,
- respwin=np.array([], dtype=np.int32),
- dama_index=-1, trig2=trig2, side='')
- if self._lazy:
- train.lazy_shape = timesshape
- return train
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # IMPORTANT!!!
- # These are private methods implementing the internal reading mechanism.
- # Due to the way BrainWare SRC files are structured, they CANNOT be used
- # on their own. Calling these manually will almost certainly alter your
- # position in the file in an unrecoverable manner, whether they throw
- # an exception or not.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def __read_str(self, numchars=1, utf=None):
- """
- Read a string of a specific length.
- This is compatible with python 2 and python 3.
- """
- rawstr = np.asscalar(np.fromfile(self._fsrc,
- dtype='S%s' % numchars, count=1))
- if utf or (utf is None and PY_VER == 3):
- return rawstr.decode('utf-8')
- return rawstr
- def __read_annotations(self):
- """
- Read the stimulus grid properties.
- -------------------------------------------------------------------
- Returns a dictionary containing the parameter names as keys and the
- parameter values as values.
- The returned object must be added to the Block.
- ID: 29109
- """
- # int16 -- number of stimulus parameters
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- if not numelements:
- return {}
- # [data sequence] * numelements -- parameter names
- names = []
- for i in range(numelements):
- # {skip} = byte (char) -- skip one byte
- self._fsrc.seek(1, 1)
- # uint8 -- length of next string
- numchars = np.asscalar(np.fromfile(self._fsrc,
- dtype=np.uint8, count=1))
- # if there is no name, make one up
- if not numchars:
- name = 'param%s' % i
- else:
- # char * numchars -- parameter name string
- name = self.__read_str(numchars)
- # if the name is already in there, add a unique number to it
- # so it isn't overwritten
- if name in names:
- name = name + str(i)
- names.append(name)
- # float32 * numelements -- an array of parameter values
- values = np.fromfile(self._fsrc, dtype=np.float32,
- count=numelements)
- # combine the names and values into a dict
- # the dict will be added to the annotations
- annotations = dict(zip(names, values))
- return annotations
- def __read_annotations_old(self):
- """
- Read the stimulus grid properties.
- Returns a dictionary containing the parameter names as keys and the
- parameter values as values.
- ------------------------------------------------
- The returned objects must be added to the Block.
- This reads an old version of the format that does not store paramater
- names, so placeholder names are created instead.
- ID: 29099
- """
- # int16 * 14 -- an array of parameter values
- values = np.fromfile(self._fsrc, dtype=np.int16, count=14)
- # create dummy names and combine them with the values in a dict
- # the dict will be added to the annotations
- params = ['param%s' % i for i in range(len(values))]
- annotations = dict(zip(params, values))
- return annotations
- def __read_comment(self):
- """
- Read a single comment.
- The comment is stored as an Event in Segment 0, which is
- specifically for comments.
- ----------------------
- Returns an empty list.
- The returned object is already added to the Block.
- No ID number: always called from another method
- """
- # float64 -- timestamp (number of days since dec 30th 1899)
- time = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # int16 -- length of next string
- numchars1 = np.asscalar(np.fromfile(self._fsrc,
- dtype=np.int16, count=1))
- # char * numchars -- the one who sent the comment
- sender = self.__read_str(numchars1)
- # int16 -- length of next string
- numchars2 = np.asscalar(np.fromfile(self._fsrc,
- dtype=np.int16, count=1))
- # char * numchars -- comment text
- text = self.__read_str(numchars2, utf=False)
- comment = Event(times=pq.Quantity(time, units=pq.d), labels=text,
- sender=sender, file_origin=self._file_origin)
- self._seg0.events.append(comment)
- return []
- def __read_list(self):
- """
- Read a list of arbitrary data sequences
- It only says how many data sequences should be read. These sequences
- are then read by their ID number.
- Note that lists can be nested.
- If there are too many sequences (for instance if there are a large
- number of spikes in a Segment) then a negative number will be returned
- for the number of data sequences to read. In this case the method
- tries to guess. This also means that all future list data sequences
- have unreliable lengths as well.
- -------------------------------------------
- Returns a list of objects.
- Whether these objects need to be added to the Block depends on the
- object in question.
- There are several data sequences that have identical formats but are
- used in different situations. That means this data sequences has
- multiple ID numbers.
- ID: 29082
- ID: 29083
- ID: 29091
- ID: 29093
- """
- # int16 -- number of sequences to read
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # {skip} = bytes * 4 (int16 * 2) -- skip four bytes
- self._fsrc.seek(4, 1)
- if numelements == 0:
- return []
- if not self._damaged and numelements < 0:
- self._damaged = True
- self.logger.error('Negative sequence count %s, file damaged',
- numelements)
- if not self._damaged:
- # read the sequences into a list
- seq_list = [self._read_by_id() for _ in range(numelements)]
- else:
- # read until we get some indication we should stop
- seq_list = []
- # uint16 -- the ID of the next sequence
- seqidinit = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go back to
- # before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- while 1:
- # uint16 -- the ID of the next sequence
- seqid = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go
- # back to before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- # if we come across a new sequence, we are at the end of the
- # list so we should stop
- if seqidinit != seqid:
- break
- # otherwise read the next sequence
- seq_list.append(self._read_by_id())
- return seq_list
- def __read_segment(self):
- """
- Read an individual Segment.
- A Segment contains a dictionary of parameters, the length of the
- recording, a list of Units with their Spikes, and a list of Spikes
- not assigned to any Unit. The unassigned spikes are always stored in
- Unit 0, which is exclusively for storing these spikes.
- -------------------------------------------------
- Returns the Segment object created by the method.
- The returned object is already added to the Block.
- ID: 29106
- """
- # (data_obj) -- the stimulus parameters for this segment
- annotations = self._read_by_id()
- annotations['feature_type'] = -1
- annotations['go_by_closest_unit_center'] = False
- annotations['include_unit_bounds'] = False
- # (data_obj) -- SpikeTrain list of unassigned spikes
- # these go in the first Unit since it is for unassigned spikes
- unassigned_spikes = self._read_by_id()
- self._unit0.spiketrains.extend(unassigned_spikes)
- # read a list of units and grab the second return value, which is the
- # SpikeTrains from this Segment (if we use the Unit we will get all the
- # SpikeTrains from that Unit, resuling in duplicates if we are past
- # the first Segment
- trains = self._read_by_id()
- if not trains:
- if unassigned_spikes:
- # if there are no assigned spikes,
- # just use the unassigned spikes
- trains = zip(unassigned_spikes)
- else:
- # if there are no spiketrains at all,
- # create an empty spike train
- trains = [[self._default_spiketrain.copy()]]
- elif hasattr(trains[0], 'dtype'):
- # workaround for some broken files
- trains = [unassigned_spikes +
- [self._combine_spiketrains([trains])]]
- else:
- # get the second element from each returned value,
- # which is the actual SpikeTrains
- trains = [unassigned_spikes] + [train[1] for train in trains]
- # re-organize by sweeps
- trains = zip(*trains)
- # int32 -- SpikeTrain length in ms
- spiketrainlen = pq.Quantity(np.fromfile(self._fsrc, dtype=np.int32,
- count=1)[0], units=pq.ms, copy=False)
- segments = []
- for train in trains:
- # create the Segment and add everything to it
- segment = Segment(file_origin=self._file_origin,
- **annotations)
- segment.spiketrains = train
- self._blk.segments.append(segment)
- segments.append(segment)
- for itrain in train:
- # use the SpikeTrain length to figure out the stop time
- # t_start is always 0 so we can ignore it
- itrain.t_stop = spiketrainlen
- return segments
- def __read_segment_list(self):
- """
- Read a list of Segments with comments.
- Since comments can occur at any point, whether a recording is happening
- or not, it is impossible to reliably assign them to a specific Segment.
- For this reason they are always assigned to Segment 0, which is
- exclusively used to store comments.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29112
- """
- # uint8 -- number of electrode channels in the Segment
- numchannels = np.fromfile(self._fsrc, dtype=np.uint8, count=1)[0]
- # [list of sequences] -- individual Segments
- segments = self.__read_list()
- while not hasattr(segments[0], 'spiketrains'):
- segments = list(chain(*segments))
- # char -- "side of brain" info
- side = self.__read_str(1)
- # int16 -- number of comments
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # comment_obj * numelements -- comments about the Segments
- # we don't know which Segment specifically, though
- for _ in range(numelements):
- self.__read_comment()
- # create a channel_index for the numchannels
- self._chx.index = np.arange(numchannels)
- self._chx.channel_names = np.array(['Chan{}'.format(i) for i in range(numchannels)], dtype='S')
- # store what side of the head we are dealing with
- for segment in segments:
- for spiketrain in segment.spiketrains:
- spiketrain.annotations['side'] = side
- return segments
- def __read_segment_list_v8(self):
- """
- Read a list of Segments with comments.
- This is version 8 of the data sequence.
- This is the same as __read_segment_list_var, but can also contain
- one or more arbitrary sequences. The class makes an attempt to assign
- the sequences when possible, and warns the user when this happens
- (see the _assign_sequence method)
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29117
- """
- # segment_collection_var -- this is based off a segment_collection_var
- segments = self.__read_segment_list_var()
- # uint16 -- the ID of the next sequence
- seqid = np.fromfile(self._fsrc, dtype=np.uint16, count=1)[0]
- # {rewind} = byte * 2 (int16) -- move back 2 bytes, i.e. go back to
- # before the beginning of the seqid
- self._fsrc.seek(-2, 1)
- if seqid in self._ID_DICT:
- # if it is a valid seqid, read it and try to figure out where
- # to put it
- self._assign_sequence(self._read_by_id())
- else:
- # otherwise it is a Unit list
- self.__read_unit_list()
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- return segments
- def __read_segment_list_v9(self):
- """
- Read a list of Segments with comments.
- This is version 9 of the data sequence.
- This is the same as __read_segment_list_v8, but contains some
- additional annotations. These annotations are added to the Segment.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29120
- """
- # segment_collection_v8 -- this is based off a segment_collection_v8
- segments = self.__read_segment_list_v8()
- # uint8
- feature_type = np.fromfile(self._fsrc, dtype=np.uint8,
- count=1)[0]
- # uint8
- go_by_closest_unit_center = np.fromfile(self._fsrc, dtype=np.bool8,
- count=1)[0]
- # uint8
- include_unit_bounds = np.fromfile(self._fsrc, dtype=np.bool8,
- count=1)[0]
- # create a dictionary of the annotations
- annotations = {'feature_type': feature_type,
- 'go_by_closest_unit_center': go_by_closest_unit_center,
- 'include_unit_bounds': include_unit_bounds}
- # add the annotations to each Segment
- for segment in segments:
- segment.annotations.update(annotations)
- return segments
- def __read_segment_list_var(self):
- """
- Read a list of Segments with comments.
- This is the same as __read_segment_list, but contains information
- regarding the sampling period. This information is added to the
- SpikeTrains in the Segments.
- --------------------------------------------------------
- Returns a list of the Segments created with this method.
- The returned objects are already added to the Block.
- ID: 29114
- """
- # float32 -- DA conversion clock period in microsec
- sampling_period = pq.Quantity(np.fromfile(self._fsrc,
- dtype=np.float32, count=1),
- units=pq.us, copy=False)[0]
- # segment_collection -- this is based off a segment_collection
- segments = self.__read_segment_list()
- # add the sampling period to each SpikeTrain
- for segment in segments:
- for spiketrain in segment.spiketrains:
- spiketrain.sampling_period = sampling_period
- return segments
- def __read_spike_fixed(self, numpts=40):
- """
- Read a spike with a fixed waveform length (40 time bins)
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29079
- """
- # float32 -- spike time stamp in ms since start of SpikeTrain
- time = np.fromfile(self._fsrc, dtype=np.float32, count=1)
- # int8 * 40 -- spike shape -- use numpts for spike_var
- waveform = np.fromfile(self._fsrc, dtype=np.int8,
- count=numpts).reshape(1, 1, numpts)
- # uint8 -- point of return to noise
- trig2 = np.fromfile(self._fsrc, dtype=np.uint8, count=1)
- return time, waveform, trig2
- def __read_spike_fixed_old(self):
- """
- Read a spike with a fixed waveform length (40 time bins)
- This is an old version of the format. The time is stored as ints
- representing 1/25 ms time steps. It has no trigger information.
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29081
- """
- # int32 -- spike time stamp in ms since start of SpikeTrain
- time = np.fromfile(self._fsrc, dtype=np.int32, count=1) / 25.
- time = time.astype(np.float32)
- # int8 * 40 -- spike shape
- # This needs to be a 3D array, one for each channel. BrainWare
- # only ever has a single channel per file.
- waveform = np.fromfile(self._fsrc, dtype=np.int8,
- count=40).reshape(1, 1, 40)
- # create a dummy trig2 value
- trig2 = np.array([-1], dtype=np.uint8)
- return time, waveform, trig2
- def __read_spike_var(self):
- """
- Read a spike with a variable waveform length
- -------------------------------------------
- Returns the time, waveform and trig2 value.
- The returned objects must be converted to a SpikeTrain then
- added to the Block.
- ID: 29115
- """
- # uint8 -- number of points in spike shape
- numpts = np.fromfile(self._fsrc, dtype=np.uint8, count=1)[0]
- # spike_fixed is the same as spike_var if you don't read the numpts
- # byte and set numpts = 40
- return self.__read_spike_fixed(numpts)
- def __read_spiketrain_indexed(self):
- """
- Read a SpikeTrain
- This is the same as __read_spiketrain_timestamped except it also
- contains the index of the Segment in the dam file.
- The index is stored as an annotation in the SpikeTrain.
- -------------------------------------------------
- Returns a SpikeTrain object with multiple spikes.
- The returned object must be added to the Block.
- ID: 29121
- """
- # int32 -- index of the analogsignalarray in corresponding .dam file
- dama_index = np.fromfile(self._fsrc, dtype=np.int32,
- count=1)[0]
- # spiketrain_timestamped -- this is based off a spiketrain_timestamped
- spiketrain = self.__read_spiketrain_timestamped()
- # add the property to the dict
- spiketrain.annotations['dama_index'] = dama_index
- return spiketrain
- def __read_spiketrain_timestamped(self):
- """
- Read a SpikeTrain
- This SpikeTrain contains a time stamp for when it was recorded
- The timestamp is stored as an annotation in the SpikeTrain.
- -------------------------------------------------
- Returns a SpikeTrain object with multiple spikes.
- The returned object must be added to the Block.
- ID: 29110
- """
- # float64 -- timeStamp (number of days since dec 30th 1899)
- timestamp = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # convert to datetime object
- timestamp = self._convert_timestamp(timestamp)
- # seq_list -- spike list
- # combine the spikes into a single SpikeTrain
- spiketrain = self._combine_spiketrains(self.__read_list())
- # add the timestamp
- spiketrain.annotations['timestamp'] = timestamp
- return spiketrain
- def __read_unit(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This is the same as __read_unit_unsorted except it also contains
- information on the spike sorting boundaries.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29116
- """
- # same as unsorted Unit
- unit, trains = self.__read_unit_unsorted()
- # float32 * 18 -- Unit boundaries (IEEE 32-bit floats)
- unit.annotations['boundaries'] = [np.fromfile(self._fsrc,
- dtype=np.float32,
- count=18)]
- # uint8 * 9 -- boolean values indicating elliptic feature boundary
- # dimensions
- unit.annotations['elliptic'] = [np.fromfile(self._fsrc,
- dtype=np.uint8,
- count=9)]
- return unit, trains
- def __read_unit_list(self):
- """
- A list of a list of Units
- -----------------------------------------------
- Returns a list of Units modified in the method.
- The returned objects are already added to the Block.
- No ID number: only called by other methods
- """
- # this is used to figure out which Units to return
- maxunit = 1
- # int16 -- number of time slices
- numelements = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # {sequence} * numelements1 -- the number of lists of Units to read
- self._chx.annotations['max_valid'] = []
- for i in range(numelements):
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- # double
- max_valid = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # int16 - the number of Units to read
- numunits = np.fromfile(self._fsrc, dtype=np.int16, count=1)[0]
- # update tha maximum Unit so far
- maxunit = max(maxunit, numunits + 1)
- # if there aren't enough Units, create them
- # remember we need to skip the UnassignedSpikes Unit
- if numunits > len(self._chx.units) + 1:
- for ind1 in range(len(self._chx.units), numunits + 1):
- unit = Unit(name='unit%s' % ind1,
- file_origin=self._file_origin,
- elliptic=[], boundaries=[],
- timestamp=[], max_valid=[])
- self._chx.units.append(unit)
- # {Block} * numelements -- Units
- for ind1 in range(numunits):
- # get the Unit with the given index
- # remember we need to skip the UnassignedSpikes Unit
- unit = self._chx.units[ind1 + 1]
- # {skip} = byte * 2 (int16) -- skip 2 bytes
- self._fsrc.seek(2, 1)
- # int16 -- a multiplier for the elliptic and boundaries
- # properties
- numelements3 = np.fromfile(self._fsrc, dtype=np.int16,
- count=1)[0]
- # uint8 * 10 * numelements3 -- boolean values indicating
- # elliptic feature boundary dimensions
- elliptic = np.fromfile(self._fsrc, dtype=np.uint8,
- count=10 * numelements3)
- # float32 * 20 * numelements3 -- feature boundaries
- boundaries = np.fromfile(self._fsrc, dtype=np.float32,
- count=20 * numelements3)
- unit.annotations['elliptic'].append(elliptic)
- unit.annotations['boundaries'].append(boundaries)
- unit.annotations['max_valid'].append(max_valid)
- return self._chx.units[1:maxunit]
- def __read_unit_list_timestamped(self):
- """
- A list of a list of Units.
- This is the same as __read_unit_list, except that it also has a
- timestamp. This is added ad an annotation to all Units.
- -----------------------------------------------
- Returns a list of Units modified in the method.
- The returned objects are already added to the Block.
- ID: 29119
- """
- # double -- time zero (number of days since dec 30th 1899)
- timestamp = np.fromfile(self._fsrc, dtype=np.double, count=1)[0]
- # convert to to days since UNIX epoc time:
- timestamp = self._convert_timestamp(timestamp)
- # sorter -- this is based off a sorter
- units = self.__read_unit_list()
- for unit in units:
- unit.annotations['timestamp'].append(timestamp)
- return units
- def __read_unit_old(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This is the same as __read_unit_unsorted except it also contains
- information on the spike sorting boundaries.
- This is an old version of the format that used 48-bit floating-point
- numbers for the boundaries. These cannot easily be read and so are
- skipped.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29107
- """
- # same as Unit
- unit, trains = self.__read_unit_unsorted()
- # bytes * 108 (float48 * 18) -- Unit boundaries (48-bit floating
- # point numbers are not supported so we skip them)
- self._fsrc.seek(108, 1)
- # uint8 * 9 -- boolean values indicating elliptic feature boundary
- # dimensions
- unit.annotations['elliptic'] = np.fromfile(self._fsrc, dtype=np.uint8,
- count=9).tolist()
- return unit, trains
- def __read_unit_unsorted(self):
- """
- Read all SpikeTrains from a single Segment and Unit
- This does not contain Unit boundaries.
- ------------------------------------------------------------------
- Returns a single Unit and a list of SpikeTrains from that Unit and
- current Segment, in that order. The SpikeTrains must be returned since
- it is not possible to determine from the Unit which SpikeTrains are
- from the current Segment.
- The returned objects are already added to the Block. The SpikeTrains
- must be added to the current Segment.
- ID: 29084
- """
- # {skip} = bytes * 2 (uint16) -- skip two bytes
- self._fsrc.seek(2, 1)
- # uint16 -- number of characters in next string
- numchars = np.asscalar(np.fromfile(self._fsrc,
- dtype=np.uint16, count=1))
- # char * numchars -- ID string of Unit
- name = self.__read_str(numchars)
- # int32 -- SpikeTrain length in ms
- # int32 * 4 -- response and spon period boundaries
- parts = np.fromfile(self._fsrc, dtype=np.int32, count=5)
- t_stop = pq.Quantity(parts[0].astype('float32'),
- units=pq.ms, copy=False)
- respwin = parts[1:]
- # (data_obj) -- list of SpikeTrains
- spikeslists = self._read_by_id()
- # use the Unit if it already exists, otherwise create it
- if name in self._unitdict:
- unit = self._unitdict[name]
- else:
- unit = Unit(name=name, file_origin=self._file_origin,
- elliptic=[], boundaries=[], timestamp=[], max_valid=[])
- self._chx.units.append(unit)
- self._unitdict[name] = unit
- # convert the individual spikes to SpikeTrains and add them to the Unit
- trains = [self._combine_spiketrains(spikes) for spikes in spikeslists]
- unit.spiketrains.extend(trains)
- for train in trains:
- train.t_stop = t_stop.copy()
- train.annotations['respwin'] = respwin.copy()
- return unit, trains
- def __skip_information(self):
- """
- Read an information sequence.
- This is data sequence is skipped both here and in the Matlab reference
- implementation.
- ----------------------
- Returns an empty list
- Nothing is created so nothing is added to the Block.
- ID: 29113
- """
- # {skip} char * 34 -- display information
- self._fsrc.seek(34, 1)
- return []
- def __skip_information_old(self):
- """
- Read an information sequence
- This is data sequence is skipped both here and in the Matlab reference
- implementation
- This is an old version of the format
- ----------------------
- Returns an empty list.
- Nothing is created so nothing is added to the Block.
- ID: 29100
- """
- # {skip} char * 4 -- display information
- self._fsrc.seek(4, 1)
- return []
- # This dictionary maps the numeric data sequence ID codes to the data
- # sequence reading functions.
- #
- # Since functions are first-class objects in Python, the functions returned
- # from this dictionary are directly callable.
- #
- # If new data sequence ID codes are added in the future please add the code
- # here in numeric order and the method above in alphabetical order
- #
- # The naming of any private method may change at any time
- _ID_DICT = {29079: __read_spike_fixed,
- 29081: __read_spike_fixed_old,
- 29082: __read_list,
- 29083: __read_list,
- 29084: __read_unit_unsorted,
- 29091: __read_list,
- 29093: __read_list,
- 29099: __read_annotations_old,
- 29100: __skip_information_old,
- 29106: __read_segment,
- 29107: __read_unit_old,
- 29109: __read_annotations,
- 29110: __read_spiketrain_timestamped,
- 29112: __read_segment_list,
- 29113: __skip_information,
- 29114: __read_segment_list_var,
- 29115: __read_spike_var,
- 29116: __read_unit,
- 29117: __read_segment_list_v8,
- 29119: __read_unit_list_timestamped,
- 29120: __read_segment_list_v9,
- 29121: __read_spiketrain_indexed
- }
- def convert_brainwaresrc_timestamp(timestamp,
- start_date=datetime(1899, 12, 30)):
- """
- convert_brainwaresrc_timestamp(timestamp, start_date) - convert a timestamp
- in brainware src file units to a python datetime object.
- start_date defaults to 1899.12.30 (ISO format), which is the start date
- used by all BrainWare SRC data Blocks so far. If manually specified
- it should be a datetime object or any other object that can be added
- to a timedelta object.
- """
- # datetime + timedelta = datetime again.
- return start_date + timedelta(days=timestamp)
- if __name__ == '__main__':
- # run this when calling the file directly as a benchmark
- from neo.test.iotest.test_brainwaresrcio import FILES_TO_TEST
- from neo.test.iotest.common_io_test import url_for_tests
- from neo.test.iotest.tools import (create_local_temp_dir,
- download_test_file,
- get_test_file_full_path,
- make_all_directories)
- shortname = BrainwareSrcIO.__name__.lower().strip('io')
- local_test_dir = create_local_temp_dir(shortname)
- url = url_for_tests+shortname
- FILES_TO_TEST.remove('long_170s_1rep_1clust_ch2.src')
- make_all_directories(FILES_TO_TEST, local_test_dir)
- download_test_file(FILES_TO_TEST, local_test_dir, url)
- for path in get_test_file_full_path(ioclass=BrainwareSrcIO,
- filename=FILES_TO_TEST,
- directory=local_test_dir):
- ioobj = BrainwareSrcIO(path)
- ioobj.read_all_blocks(lazy=False)
- ioobj.read_all_blocks(lazy=True)
|