Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

brainwaredamio.py 8.9 KB


  1. # -*- coding: utf-8 -*-
  2. '''
  3. Class for reading from Brainware DAM files
  4. DAM files are binary files for holding raw data. They are broken up into
  5. sequence of Segments, each containing a single raw trace and parameters.
  6. The DAM file does NOT contain a sampling rate, nor can it be reliably
  7. calculated from any of the parameters. You can calculate it from
  8. the "sweep length" attribute if it is present, but it isn't always present.
  9. It is more reliable to get it from the corresponding SRC file or F32 file if
  10. you have one.
  11. The DAM file also does not divide up data into Blocks, so only a single
  12. Block is returned..
  13. Brainware was developed by Dr. Jan Schnupp and is availabe from
  14. Tucker Davis Technologies, Inc.
  15. http://www.tdt.com/downloads.htm
  16. Neither Dr. Jan Schnupp nor Tucker Davis Technologies, Inc. had any part in the
  17. development of this code
  18. The code is implemented with the permission of Dr. Jan Schnupp
  19. Author: Todd Jennings
  20. '''
  21. # needed for python 3 compatibility
  22. from __future__ import absolute_import, division, print_function
  23. # import needed core python modules
  24. import os
  25. import os.path
  26. # numpy and quantities are already required by neo
  27. import numpy as np
  28. import quantities as pq
  29. # needed core neo modules
  30. from neo.core import (AnalogSignal, Block,
  31. ChannelIndex, Segment)
  32. # need to subclass BaseIO
  33. from neo.io.baseio import BaseIO
  34. class BrainwareDamIO(BaseIO):
  35. """
  36. Class for reading Brainware raw data files with the extension '.dam'.
  37. The read_block method returns the first Block of the file. It will
  38. automatically close the file after reading.
  39. The read method is the same as read_block.
  40. Note:
  41. The file format does not contain a sampling rate. The sampling rate
  42. is set to 1 Hz, but this is arbitrary. If you have a corresponding .src
  43. or .f32 file, you can get the sampling rate from that. It may also be
  44. possible to infer it from the attributes, such as "sweep length", if
  45. present.
  46. Usage:
  47. >>> from neo.io.brainwaredamio import BrainwareDamIO
  48. >>> damfile = BrainwareDamIO(filename='multi_500ms_mulitrep_ch1.dam')
  49. >>> blk1 = damfile.read()
  50. >>> blk2 = damfile.read_block()
  51. >>> print blk1.segments
  52. >>> print blk1.segments[0].analogsignals
  53. >>> print blk1.units
  54. >>> print blk1.units[0].name
  55. >>> print blk2
  56. >>> print blk2[0].segments
  57. """
  58. is_readable = True # This class can only read data
  59. is_writable = False # write is not supported
  60. # This class is able to directly or indirectly handle the following objects
  61. # You can notice that this greatly simplifies the full Neo object hierarchy
  62. supported_objects = [Block, ChannelIndex,
  63. Segment, AnalogSignal]
  64. readable_objects = [Block]
  65. writeable_objects = []
  66. has_header = False
  67. is_streameable = False
  68. # This is for GUI stuff: a definition for parameters when reading.
  69. # This dict should be keyed by object (`Block`). Each entry is a list
  70. # of tuple. The first entry in each tuple is the parameter name. The
  71. # second entry is a dict with keys 'value' (for default value),
  72. # and 'label' (for a descriptive name).
  73. # Note that if the highest-level object requires parameters,
  74. # common_io_test will be skipped.
  75. read_params = {Block: []}
  76. # do not support write so no GUI stuff
  77. write_params = None
  78. name = 'Brainware DAM File'
  79. extensions = ['dam']
  80. mode = 'file'
  81. def __init__(self, filename=None):
  82. '''
  83. Arguments:
  84. filename: the filename
  85. '''
  86. BaseIO.__init__(self)
  87. self._path = filename
  88. self._filename = os.path.basename(filename)
  89. self._fsrc = None
  90. def read(self, lazy=False, cascade=True, **kargs):
  91. '''
  92. Reads raw data file "fname" generated with BrainWare
  93. '''
  94. return self.read_block(lazy=lazy, cascade=cascade)
  95. def read_block(self, lazy=False, cascade=True, **kargs):
  96. '''
  97. Reads a block from the raw data file "fname" generated
  98. with BrainWare
  99. '''
  100. # there are no keyargs implemented to so far. If someone tries to pass
  101. # them they are expecting them to do something or making a mistake,
  102. # neither of which should pass silently
  103. if kargs:
  104. raise NotImplementedError('This method does not have any '
  105. 'arguments implemented yet')
  106. self._fsrc = None
  107. block = Block(file_origin=self._filename)
  108. # if we aren't doing cascade, don't load anything
  109. if not cascade:
  110. return block
  111. # create the objects to store other objects
  112. chx = ChannelIndex(file_origin=self._filename,
  113. channel_ids=np.array([1]),
  114. index=np.array([0]),
  115. channel_names=np.array(['Chan1'], dtype='S'))
  116. # load objects into their containers
  117. block.channel_indexes.append(chx)
  118. # open the file
  119. with open(self._path, 'rb') as fobject:
  120. # while the file is not done keep reading segments
  121. while True:
  122. seg = self._read_segment(fobject, lazy)
  123. # if there are no more Segments, stop
  124. if not seg:
  125. break
  126. # store the segment and signals
  127. seg.analogsignals[0].channel_index = chx
  128. block.segments.append(seg)
  129. # remove the file object
  130. self._fsrc = None
  131. block.create_many_to_one_relationship()
  132. return block
  133. # -------------------------------------------------------------------------
  134. # -------------------------------------------------------------------------
  135. # IMPORTANT!!!
  136. # These are private methods implementing the internal reading mechanism.
  137. # Due to the way BrainWare DAM files are structured, they CANNOT be used
  138. # on their own. Calling these manually will almost certainly alter your
  139. # position in the file in an unrecoverable manner, whether they throw
  140. # an exception or not.
  141. # -------------------------------------------------------------------------
  142. # -------------------------------------------------------------------------
  143. def _read_segment(self, fobject, lazy):
  144. '''
  145. Read a single segment with a single analogsignal
  146. Returns the segment or None if there are no more segments
  147. '''
  148. try:
  149. # float64 -- start time of the AnalogSignal
  150. t_start = np.fromfile(fobject, dtype=np.float64, count=1)[0]
  151. except IndexError:
  152. # if there are no more Segments, return
  153. return False
  154. # int16 -- index of the stimulus parameters
  155. seg_index = np.fromfile(fobject, dtype=np.int16, count=1)[0].tolist()
  156. # int16 -- number of stimulus parameters
  157. numelements = np.fromfile(fobject, dtype=np.int16, count=1)[0]
  158. # read the name strings for the stimulus parameters
  159. paramnames = []
  160. for _ in range(numelements):
  161. # unit8 -- the number of characters in the string
  162. numchars = np.fromfile(fobject, dtype=np.uint8, count=1)[0]
  163. # char * numchars -- a single name string
  164. name = np.fromfile(fobject, dtype=np.uint8, count=numchars)
  165. # exclude invalid characters
  166. name = str(name[name >= 32].view('c').tostring())
  167. # add the name to the list of names
  168. paramnames.append(name)
  169. # float32 * numelements -- the values for the stimulus parameters
  170. paramvalues = np.fromfile(fobject, dtype=np.float32, count=numelements)
  171. # combine parameter names and the parameters as a dict
  172. params = dict(zip(paramnames, paramvalues))
  173. # int32 -- the number elements in the AnalogSignal
  174. numpts = np.fromfile(fobject, dtype=np.int32, count=1)[0]
  175. # int16 * numpts -- the AnalogSignal itself
  176. signal = np.fromfile(fobject, dtype=np.int16, count=numpts)
  177. # handle lazy loading
  178. if lazy:
  179. sig = AnalogSignal([], t_start=t_start*pq.d,
  180. file_origin=self._filename,
  181. sampling_period=1.*pq.s,
  182. units=pq.mV,
  183. dtype=np.float)
  184. sig.lazy_shape = len(signal)
  185. else:
  186. sig = AnalogSignal(signal.astype(np.float)*pq.mV,
  187. t_start=t_start*pq.d,
  188. file_origin=self._filename,
  189. sampling_period=1.*pq.s,
  190. copy=False)
  191. # Note: setting the sampling_period to 1 s is arbitrary
  192. # load the AnalogSignal and parameters into a new Segment
  193. seg = Segment(file_origin=self._filename,
  194. index=seg_index,
  195. **params)
  196. seg.analogsignals = [sig]
  197. return seg