micromedrawio.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data from micromed (.trc).
  4. Inspired by the Matlab code for EEGLAB from Rami K. Niazy.
  5. Completed with matlab Guillaume BECQ code.
  6. Author: Samuel Garcia
  7. """
  8. from __future__ import print_function, division, absolute_import
  9. # from __future__ import unicode_literals is not compatible with numpy.dtype both py2 py3
  10. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  11. _event_channel_dtype)
  12. import numpy as np
  13. import datetime
  14. import os
  15. import struct
  16. import io
  17. class StructFile(io.BufferedReader):
  18. def read_f(self, fmt, offset=None):
  19. if offset is not None:
  20. self.seek(offset)
  21. return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
  22. class MicromedRawIO(BaseRawIO):
  23. """
  24. Class for reading data from micromed (.trc).
  25. """
  26. extensions = ['trc', 'TRC']
  27. rawmode = 'one-file'
  28. def __init__(self, filename=''):
  29. BaseRawIO.__init__(self)
  30. self.filename = filename
  31. def _parse_header(self):
  32. with io.open(self.filename, 'rb') as fid:
  33. f = StructFile(fid)
  34. # Name
  35. f.seek(64)
  36. surname = f.read(22).strip(b' ')
  37. firstname = f.read(20).strip(b' ')
  38. # Date
  39. day, month, year, hour, minute, sec = f.read_f('bbbbbb', offset=128)
  40. rec_datetime = datetime.datetime(year + 1900, month, day, hour, minute,
  41. sec)
  42. Data_Start_Offset, Num_Chan, Multiplexer, Rate_Min, Bytes = f.read_f(
  43. 'IHHHH', offset=138)
  44. # header version
  45. header_version, = f.read_f('b', offset=175)
  46. assert header_version == 4
  47. # area
  48. f.seek(176)
  49. zone_names = ['ORDER', 'LABCOD', 'NOTE', 'FLAGS', 'TRONCA',
  50. 'IMPED_B', 'IMPED_E', 'MONTAGE',
  51. 'COMPRESS', 'AVERAGE', 'HISTORY', 'DVIDEO', 'EVENT A',
  52. 'EVENT B', 'TRIGGER']
  53. zones = {}
  54. for zname in zone_names:
  55. zname2, pos, length = f.read_f('8sII')
  56. zones[zname] = zname2, pos, length
  57. assert zname == zname2.decode('ascii').strip(' ')
  58. # raw signals memmap
  59. sig_dtype = 'u' + str(Bytes)
  60. self._raw_signals = np.memmap(self.filename, dtype=sig_dtype, mode='r',
  61. offset=Data_Start_Offset).reshape(-1, Num_Chan)
  62. # Reading Code Info
  63. zname2, pos, length = zones['ORDER']
  64. f.seek(pos)
  65. code = np.frombuffer(f.read(Num_Chan * 2), dtype='u2')
  66. units_code = {-1: 'nV', 0: 'uV', 1: 'mV', 2: 1, 100: 'percent',
  67. 101: 'dimensionless', 102: 'dimensionless'}
  68. sig_channels = []
  69. sig_grounds = []
  70. for c in range(Num_Chan):
  71. zname2, pos, length = zones['LABCOD']
  72. f.seek(pos + code[c] * 128 + 2, 0)
  73. chan_name = f.read(6).strip(b"\x00").decode('ascii')
  74. ground = f.read(6).strip(b"\x00").decode('ascii')
  75. sig_grounds.append(ground)
  76. logical_min, logical_max, logical_ground, physical_min, physical_max = f.read_f(
  77. 'iiiii')
  78. k, = f.read_f('h')
  79. units = units_code.get(k, 'uV')
  80. factor = float(physical_max - physical_min) / float(
  81. logical_max - logical_min + 1)
  82. gain = factor
  83. offset = -logical_ground * factor
  84. f.seek(8, 1)
  85. sampling_rate, = f.read_f('H')
  86. sampling_rate *= Rate_Min
  87. chan_id = c
  88. group_id = 0
  89. sig_channels.append((chan_name, chan_id, sampling_rate, sig_dtype,
  90. units, gain, offset, group_id))
  91. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  92. assert np.unique(sig_channels['sampling_rate']).size == 1
  93. self._sampling_rate = float(np.unique(sig_channels['sampling_rate'])[0])
  94. # Event channels
  95. event_channels = []
  96. event_channels.append(('Trigger', '', 'event'))
  97. event_channels.append(('Note', '', 'event'))
  98. event_channels.append(('Event A', '', 'epoch'))
  99. event_channels.append(('Event B', '', 'epoch'))
  100. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  101. # Read trigger and notes
  102. self._raw_events = []
  103. ev_dtypes = [('TRIGGER', [('start', 'u4'), ('label', 'u2')]),
  104. ('NOTE', [('start', 'u4'), ('label', 'S40')]),
  105. ('EVENT A', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
  106. ('EVENT B', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
  107. ]
  108. for zname, ev_dtype in ev_dtypes:
  109. zname2, pos, length = zones[zname]
  110. dtype = np.dtype(ev_dtype)
  111. rawevent = np.memmap(self.filename, dtype=dtype, mode='r',
  112. offset=pos, shape=length // dtype.itemsize)
  113. keep = (rawevent['start'] >= rawevent['start'][0]) & (
  114. rawevent['start'] < self._raw_signals.shape[0]) & (
  115. rawevent['start'] != 0)
  116. rawevent = rawevent[keep]
  117. self._raw_events.append(rawevent)
  118. # No spikes
  119. unit_channels = []
  120. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  121. # fille into header dict
  122. self.header = {}
  123. self.header['nb_block'] = 1
  124. self.header['nb_segment'] = [1]
  125. self.header['signal_channels'] = sig_channels
  126. self.header['unit_channels'] = unit_channels
  127. self.header['event_channels'] = event_channels
  128. # insert some annotation at some place
  129. self._generate_minimal_annotations()
  130. bl_annotations = self.raw_annotations['blocks'][0]
  131. seg_annotations = bl_annotations['segments'][0]
  132. for d in (bl_annotations, seg_annotations):
  133. d['rec_datetime'] = rec_datetime
  134. d['firstname'] = firstname
  135. d['surname'] = surname
  136. d['header_version'] = header_version
  137. for c in range(sig_channels.size):
  138. anasig_an = seg_annotations['signals'][c]
  139. anasig_an['ground'] = sig_grounds[c]
  140. channel_an = self.raw_annotations['signal_channels'][c]
  141. channel_an['ground'] = sig_grounds[c]
  142. def _source_name(self):
  143. return self.filename
  144. def _segment_t_start(self, block_index, seg_index):
  145. return 0.
  146. def _segment_t_stop(self, block_index, seg_index):
  147. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  148. return t_stop
  149. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  150. return self._raw_signals.shape[0]
  151. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  152. return 0.
  153. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  154. if channel_indexes is None:
  155. channel_indexes = slice(channel_indexes)
  156. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  157. return raw_signals
  158. def _spike_count(self, block_index, seg_index, unit_index):
  159. return 0
  160. def _event_count(self, block_index, seg_index, event_channel_index):
  161. n = self._raw_events[event_channel_index].size
  162. return n
  163. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  164. raw_event = self._raw_events[event_channel_index]
  165. if t_start is not None:
  166. keep = raw_event['start'] >= int(t_start * self._sampling_rate)
  167. raw_event = raw_event[keep]
  168. if t_stop is not None:
  169. keep = raw_event['start'] <= int(t_stop * self._sampling_rate)
  170. raw_event = raw_event[keep]
  171. timestamp = raw_event['start']
  172. if event_channel_index < 2:
  173. durations = None
  174. else:
  175. durations = raw_event['stop'] - raw_event['start']
  176. labels = raw_event['label'].astype('U')
  177. return timestamp, durations, labels
  178. def _rescale_event_timestamp(self, event_timestamps, dtype):
  179. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  180. return event_times
  181. def _rescale_epoch_duration(self, raw_duration, dtype):
  182. durations = raw_duration.astype(dtype) // self._sampling_rate
  183. return durations