1
0

rawbinarysignalrawio.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading data in a raw binary interleaved compact file.
  4. Sampling rate, units, number of channel and dtype must be externally known.
  5. This generic format is quite widely used in old acquisition systems
  6. and is quite universal for sharing data.
  7. The write part of this IO is only available at neo.io level with the other
  8. class RawBinarySignalIO
  9. Important release note:
  10. * Since the version neo 0.6.0 and the neo.rawio API,
  11. argmuents of the IO (dtype, nb_channel, sampling_rate) must be
  12. given at the __init__ and not at read_segment() because there is
  13. no read_segment() in neo.rawio classes.
  14. Author: Samuel Garcia
  15. """
  16. from __future__ import unicode_literals, print_function, division, absolute_import
  17. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  18. _event_channel_dtype)
  19. import numpy as np
  20. import os
  21. import sys
  22. class RawBinarySignalRawIO(BaseRawIO):
  23. extensions = ['raw', '*']
  24. rawmode = 'one-file'
  25. def __init__(self, filename='', dtype='int16', sampling_rate=10000.,
  26. nb_channel=2, signal_gain=1., signal_offset=0., bytesoffset=0):
  27. BaseRawIO.__init__(self)
  28. self.filename = filename
  29. self.dtype = dtype
  30. self.sampling_rate = sampling_rate
  31. self.nb_channel = nb_channel
  32. self.signal_gain = signal_gain
  33. self.signal_offset = signal_offset
  34. self.bytesoffset = bytesoffset
  35. def _source_name(self):
  36. return self.filename
  37. def _parse_header(self):
  38. if os.path.exists(self.filename):
  39. self._raw_signals = np.memmap(self.filename, dtype=self.dtype, mode='r',
  40. offset=self.bytesoffset).reshape(-1, self.nb_channel)
  41. else:
  42. # The the neo.io.RawBinarySignalIO is used for write_segment
  43. self._raw_signals = None
  44. sig_channels = []
  45. if self._raw_signals is not None:
  46. for c in range(self.nb_channel):
  47. name = 'ch{}'.format(c)
  48. chan_id = c
  49. units = ''
  50. group_id = 0
  51. sig_channels.append((name, chan_id, self.sampling_rate, self.dtype,
  52. units, self.signal_gain, self.signal_offset, group_id))
  53. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  54. # No events
  55. event_channels = []
  56. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  57. # No spikes
  58. unit_channels = []
  59. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  60. # fille into header dict
  61. self.header = {}
  62. self.header['nb_block'] = 1
  63. self.header['nb_segment'] = [1]
  64. self.header['signal_channels'] = sig_channels
  65. self.header['unit_channels'] = unit_channels
  66. self.header['event_channels'] = event_channels
  67. # insert some annotation at some place
  68. self._generate_minimal_annotations()
  69. def _segment_t_start(self, block_index, seg_index):
  70. return 0.
  71. def _segment_t_stop(self, block_index, seg_index):
  72. t_stop = self._raw_signals.shape[0] / self.sampling_rate
  73. return t_stop
  74. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  75. return self._raw_signals.shape[0]
  76. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  77. return 0.
  78. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  79. if channel_indexes is None:
  80. channel_indexes = slice(None)
  81. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  82. return raw_signals