123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- # -*- coding: utf-8 -*-
- '''
- Class for reading from Brainware DAM files
- DAM files are binary files for holding raw data. They are broken up into
- sequence of Segments, each containing a single raw trace and parameters.
- The DAM file does NOT contain a sampling rate, nor can it be reliably
- calculated from any of the parameters. You can calculate it from
- the "sweep length" attribute if it is present, but it isn't always present.
- It is more reliable to get it from the corresponding SRC file or F32 file if
- you have one.
- The DAM file also does not divide up data into Blocks, so only a single
- Block is returned..
- Brainware was developed by Dr. Jan Schnupp and is availabe from
- Tucker Davis Technologies, Inc.
- http://www.tdt.com/downloads.htm
- Neither Dr. Jan Schnupp nor Tucker Davis Technologies, Inc. had any part in the
- development of this code
- The code is implemented with the permission of Dr. Jan Schnupp
- Author: Todd Jennings
- '''
- # needed for python 3 compatibility
- from __future__ import absolute_import, division, print_function
- # import needed core python modules
- import os
- import os.path
- # numpy and quantities are already required by neo
- import numpy as np
- import quantities as pq
- # needed core neo modules
- from neo.core import (AnalogSignal, Block,
- ChannelIndex, Segment)
- # need to subclass BaseIO
- from neo.io.baseio import BaseIO
- class BrainwareDamIO(BaseIO):
- """
- Class for reading Brainware raw data files with the extension '.dam'.
- The read_block method returns the first Block of the file. It will
- automatically close the file after reading.
- The read method is the same as read_block.
- Note:
- The file format does not contain a sampling rate. The sampling rate
- is set to 1 Hz, but this is arbitrary. If you have a corresponding .src
- or .f32 file, you can get the sampling rate from that. It may also be
- possible to infer it from the attributes, such as "sweep length", if
- present.
- Usage:
- >>> from neo.io.brainwaredamio import BrainwareDamIO
- >>> damfile = BrainwareDamIO(filename='multi_500ms_mulitrep_ch1.dam')
- >>> blk1 = damfile.read()
- >>> blk2 = damfile.read_block()
- >>> print blk1.segments
- >>> print blk1.segments[0].analogsignals
- >>> print blk1.units
- >>> print blk1.units[0].name
- >>> print blk2
- >>> print blk2[0].segments
- """
- is_readable = True # This class can only read data
- is_writable = False # write is not supported
- # This class is able to directly or indirectly handle the following objects
- # You can notice that this greatly simplifies the full Neo object hierarchy
- supported_objects = [Block, ChannelIndex,
- Segment, AnalogSignal]
- readable_objects = [Block]
- writeable_objects = []
- has_header = False
- is_streameable = False
- # This is for GUI stuff: a definition for parameters when reading.
- # This dict should be keyed by object (`Block`). Each entry is a list
- # of tuple. The first entry in each tuple is the parameter name. The
- # second entry is a dict with keys 'value' (for default value),
- # and 'label' (for a descriptive name).
- # Note that if the highest-level object requires parameters,
- # common_io_test will be skipped.
- read_params = {Block: []}
- # do not support write so no GUI stuff
- write_params = None
- name = 'Brainware DAM File'
- extensions = ['dam']
- mode = 'file'
- def __init__(self, filename=None):
- '''
- Arguments:
- filename: the filename
- '''
- BaseIO.__init__(self)
- self._path = filename
- self._filename = os.path.basename(filename)
- self._fsrc = None
- def read(self, lazy=False, cascade=True, **kargs):
- '''
- Reads raw data file "fname" generated with BrainWare
- '''
- return self.read_block(lazy=lazy, cascade=cascade)
- def read_block(self, lazy=False, cascade=True, **kargs):
- '''
- Reads a block from the raw data file "fname" generated
- with BrainWare
- '''
- # there are no keyargs implemented to so far. If someone tries to pass
- # them they are expecting them to do something or making a mistake,
- # neither of which should pass silently
- if kargs:
- raise NotImplementedError('This method does not have any '
- 'arguments implemented yet')
- self._fsrc = None
- block = Block(file_origin=self._filename)
- # if we aren't doing cascade, don't load anything
- if not cascade:
- return block
- # create the objects to store other objects
- chx = ChannelIndex(file_origin=self._filename,
- channel_ids=np.array([1]),
- index=np.array([0]),
- channel_names=np.array(['Chan1'], dtype='S'))
- # load objects into their containers
- block.channel_indexes.append(chx)
- # open the file
- with open(self._path, 'rb') as fobject:
- # while the file is not done keep reading segments
- while True:
- seg = self._read_segment(fobject, lazy)
- # if there are no more Segments, stop
- if not seg:
- break
- # store the segment and signals
- seg.analogsignals[0].channel_index = chx
- block.segments.append(seg)
- # remove the file object
- self._fsrc = None
- block.create_many_to_one_relationship()
- return block
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # IMPORTANT!!!
- # These are private methods implementing the internal reading mechanism.
- # Due to the way BrainWare DAM files are structured, they CANNOT be used
- # on their own. Calling these manually will almost certainly alter your
- # position in the file in an unrecoverable manner, whether they throw
- # an exception or not.
- # -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- def _read_segment(self, fobject, lazy):
- '''
- Read a single segment with a single analogsignal
- Returns the segment or None if there are no more segments
- '''
- try:
- # float64 -- start time of the AnalogSignal
- t_start = np.fromfile(fobject, dtype=np.float64, count=1)[0]
- except IndexError:
- # if there are no more Segments, return
- return False
- # int16 -- index of the stimulus parameters
- seg_index = np.fromfile(fobject, dtype=np.int16, count=1)[0].tolist()
- # int16 -- number of stimulus parameters
- numelements = np.fromfile(fobject, dtype=np.int16, count=1)[0]
- # read the name strings for the stimulus parameters
- paramnames = []
- for _ in range(numelements):
- # unit8 -- the number of characters in the string
- numchars = np.fromfile(fobject, dtype=np.uint8, count=1)[0]
- # char * numchars -- a single name string
- name = np.fromfile(fobject, dtype=np.uint8, count=numchars)
- # exclude invalid characters
- name = str(name[name >= 32].view('c').tostring())
- # add the name to the list of names
- paramnames.append(name)
- # float32 * numelements -- the values for the stimulus parameters
- paramvalues = np.fromfile(fobject, dtype=np.float32, count=numelements)
- # combine parameter names and the parameters as a dict
- params = dict(zip(paramnames, paramvalues))
- # int32 -- the number elements in the AnalogSignal
- numpts = np.fromfile(fobject, dtype=np.int32, count=1)[0]
- # int16 * numpts -- the AnalogSignal itself
- signal = np.fromfile(fobject, dtype=np.int16, count=numpts)
- # handle lazy loading
- if lazy:
- sig = AnalogSignal([], t_start=t_start*pq.d,
- file_origin=self._filename,
- sampling_period=1.*pq.s,
- units=pq.mV,
- dtype=np.float)
- sig.lazy_shape = len(signal)
- else:
- sig = AnalogSignal(signal.astype(np.float)*pq.mV,
- t_start=t_start*pq.d,
- file_origin=self._filename,
- sampling_period=1.*pq.s,
- copy=False)
- # Note: setting the sampling_period to 1 s is arbitrary
- # load the AnalogSignal and parameters into a new Segment
- seg = Segment(file_origin=self._filename,
- index=seg_index,
- **params)
- seg.analogsignals = [sig]
- return seg
|