micromedrawio.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. """
  2. Class for reading/writing data from micromed (.trc).
  3. Inspired by the Matlab code for EEGLAB from Rami K. Niazy.
  4. Completed with matlab Guillaume BECQ code.
  5. Author: Samuel Garcia
  6. """
  7. # from __future__ import unicode_literals is not compatible with numpy.dtype both py2 py3
  8. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  9. _event_channel_dtype)
  10. import numpy as np
  11. import datetime
  12. import os
  13. import struct
  14. import io
  15. class StructFile(io.BufferedReader):
  16. def read_f(self, fmt, offset=None):
  17. if offset is not None:
  18. self.seek(offset)
  19. return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
  20. class MicromedRawIO(BaseRawIO):
  21. """
  22. Class for reading data from micromed (.trc).
  23. """
  24. extensions = ['trc', 'TRC']
  25. rawmode = 'one-file'
  26. def __init__(self, filename=''):
  27. BaseRawIO.__init__(self)
  28. self.filename = filename
  29. def _parse_header(self):
  30. with open(self.filename, 'rb') as fid:
  31. f = StructFile(fid)
  32. # Name
  33. f.seek(64)
  34. surname = f.read(22).strip(b' ')
  35. firstname = f.read(20).strip(b' ')
  36. # Date
  37. day, month, year, hour, minute, sec = f.read_f('bbbbbb', offset=128)
  38. rec_datetime = datetime.datetime(year + 1900, month, day, hour, minute,
  39. sec)
  40. Data_Start_Offset, Num_Chan, Multiplexer, Rate_Min, Bytes = f.read_f(
  41. 'IHHHH', offset=138)
  42. # header version
  43. header_version, = f.read_f('b', offset=175)
  44. assert header_version == 4
  45. # area
  46. f.seek(176)
  47. zone_names = ['ORDER', 'LABCOD', 'NOTE', 'FLAGS', 'TRONCA',
  48. 'IMPED_B', 'IMPED_E', 'MONTAGE',
  49. 'COMPRESS', 'AVERAGE', 'HISTORY', 'DVIDEO', 'EVENT A',
  50. 'EVENT B', 'TRIGGER']
  51. zones = {}
  52. for zname in zone_names:
  53. zname2, pos, length = f.read_f('8sII')
  54. zones[zname] = zname2, pos, length
  55. assert zname == zname2.decode('ascii').strip(' ')
  56. # raw signals memmap
  57. sig_dtype = 'u' + str(Bytes)
  58. self._raw_signals = np.memmap(self.filename, dtype=sig_dtype, mode='r',
  59. offset=Data_Start_Offset).reshape(-1, Num_Chan)
  60. # Reading Code Info
  61. zname2, pos, length = zones['ORDER']
  62. f.seek(pos)
  63. code = np.frombuffer(f.read(Num_Chan * 2), dtype='u2')
  64. units_code = {-1: 'nV', 0: 'uV', 1: 'mV', 2: 1, 100: 'percent',
  65. 101: 'dimensionless', 102: 'dimensionless'}
  66. sig_channels = []
  67. sig_grounds = []
  68. for c in range(Num_Chan):
  69. zname2, pos, length = zones['LABCOD']
  70. f.seek(pos + code[c] * 128 + 2, 0)
  71. chan_name = f.read(6).strip(b"\x00").decode('ascii')
  72. ground = f.read(6).strip(b"\x00").decode('ascii')
  73. sig_grounds.append(ground)
  74. logical_min, logical_max, logical_ground, physical_min, physical_max = f.read_f(
  75. 'iiiii')
  76. k, = f.read_f('h')
  77. units = units_code.get(k, 'uV')
  78. factor = float(physical_max - physical_min) / float(
  79. logical_max - logical_min + 1)
  80. gain = factor
  81. offset = -logical_ground * factor
  82. f.seek(8, 1)
  83. sampling_rate, = f.read_f('H')
  84. sampling_rate *= Rate_Min
  85. chan_id = c
  86. group_id = 0
  87. sig_channels.append((chan_name, chan_id, sampling_rate, sig_dtype,
  88. units, gain, offset, group_id))
  89. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  90. assert np.unique(sig_channels['sampling_rate']).size == 1
  91. self._sampling_rate = float(np.unique(sig_channels['sampling_rate'])[0])
  92. # Event channels
  93. event_channels = []
  94. event_channels.append(('Trigger', '', 'event'))
  95. event_channels.append(('Note', '', 'event'))
  96. event_channels.append(('Event A', '', 'epoch'))
  97. event_channels.append(('Event B', '', 'epoch'))
  98. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  99. # Read trigger and notes
  100. self._raw_events = []
  101. ev_dtypes = [('TRIGGER', [('start', 'u4'), ('label', 'u2')]),
  102. ('NOTE', [('start', 'u4'), ('label', 'S40')]),
  103. ('EVENT A', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
  104. ('EVENT B', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
  105. ]
  106. for zname, ev_dtype in ev_dtypes:
  107. zname2, pos, length = zones[zname]
  108. dtype = np.dtype(ev_dtype)
  109. rawevent = np.memmap(self.filename, dtype=dtype, mode='r',
  110. offset=pos, shape=length // dtype.itemsize)
  111. keep = (rawevent['start'] >= rawevent['start'][0]) & (
  112. rawevent['start'] < self._raw_signals.shape[0]) & (
  113. rawevent['start'] != 0)
  114. rawevent = rawevent[keep]
  115. self._raw_events.append(rawevent)
  116. # No spikes
  117. unit_channels = []
  118. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  119. # fille into header dict
  120. self.header = {}
  121. self.header['nb_block'] = 1
  122. self.header['nb_segment'] = [1]
  123. self.header['signal_channels'] = sig_channels
  124. self.header['unit_channels'] = unit_channels
  125. self.header['event_channels'] = event_channels
  126. # insert some annotation at some place
  127. self._generate_minimal_annotations()
  128. bl_annotations = self.raw_annotations['blocks'][0]
  129. seg_annotations = bl_annotations['segments'][0]
  130. for d in (bl_annotations, seg_annotations):
  131. d['rec_datetime'] = rec_datetime
  132. d['firstname'] = firstname
  133. d['surname'] = surname
  134. d['header_version'] = header_version
  135. for c in range(sig_channels.size):
  136. anasig_an = seg_annotations['signals'][c]
  137. anasig_an['ground'] = sig_grounds[c]
  138. channel_an = self.raw_annotations['signal_channels'][c]
  139. channel_an['ground'] = sig_grounds[c]
  140. def _source_name(self):
  141. return self.filename
  142. def _segment_t_start(self, block_index, seg_index):
  143. return 0.
  144. def _segment_t_stop(self, block_index, seg_index):
  145. t_stop = self._raw_signals.shape[0] / self._sampling_rate
  146. return t_stop
  147. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  148. return self._raw_signals.shape[0]
  149. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  150. return 0.
  151. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  152. if channel_indexes is None:
  153. channel_indexes = slice(channel_indexes)
  154. raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
  155. return raw_signals
  156. def _spike_count(self, block_index, seg_index, unit_index):
  157. return 0
  158. def _event_count(self, block_index, seg_index, event_channel_index):
  159. n = self._raw_events[event_channel_index].size
  160. return n
  161. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  162. raw_event = self._raw_events[event_channel_index]
  163. if t_start is not None:
  164. keep = raw_event['start'] >= int(t_start * self._sampling_rate)
  165. raw_event = raw_event[keep]
  166. if t_stop is not None:
  167. keep = raw_event['start'] <= int(t_stop * self._sampling_rate)
  168. raw_event = raw_event[keep]
  169. timestamp = raw_event['start']
  170. if event_channel_index < 2:
  171. durations = None
  172. else:
  173. durations = raw_event['stop'] - raw_event['start']
  174. try:
  175. labels = raw_event['label'].astype('U')
  176. except UnicodeDecodeError:
  177. # sometimes the conversion do not work : here a simple fix
  178. labels = np.array([e.decode('cp1252') for e in raw_event['label']], dtype='U')
  179. return timestamp, durations, labels
  180. def _rescale_event_timestamp(self, event_timestamps, dtype):
  181. event_times = event_timestamps.astype(dtype) / self._sampling_rate
  182. return event_times
  183. def _rescale_epoch_duration(self, raw_duration, dtype):
  184. durations = raw_duration.astype(dtype) // self._sampling_rate
  185. return durations