123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- """
- Class for reading/writing data from micromed (.trc).
- Inspired by the Matlab code for EEGLAB from Rami K. Niazy.
- Completed with matlab Guillaume BECQ code.
- Author: Samuel Garcia
- """
- # from __future__ import unicode_literals is not compatible with numpy.dtype both py2 py3
- from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
- _event_channel_dtype)
- import numpy as np
- import datetime
- import os
- import struct
- import io
- class StructFile(io.BufferedReader):
- def read_f(self, fmt, offset=None):
- if offset is not None:
- self.seek(offset)
- return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
- class MicromedRawIO(BaseRawIO):
- """
- Class for reading data from micromed (.trc).
- """
- extensions = ['trc', 'TRC']
- rawmode = 'one-file'
- def __init__(self, filename=''):
- BaseRawIO.__init__(self)
- self.filename = filename
- def _parse_header(self):
- with open(self.filename, 'rb') as fid:
- f = StructFile(fid)
- # Name
- f.seek(64)
- surname = f.read(22).strip(b' ')
- firstname = f.read(20).strip(b' ')
- # Date
- day, month, year, hour, minute, sec = f.read_f('bbbbbb', offset=128)
- rec_datetime = datetime.datetime(year + 1900, month, day, hour, minute,
- sec)
- Data_Start_Offset, Num_Chan, Multiplexer, Rate_Min, Bytes = f.read_f(
- 'IHHHH', offset=138)
- # header version
- header_version, = f.read_f('b', offset=175)
- assert header_version == 4
- # area
- f.seek(176)
- zone_names = ['ORDER', 'LABCOD', 'NOTE', 'FLAGS', 'TRONCA',
- 'IMPED_B', 'IMPED_E', 'MONTAGE',
- 'COMPRESS', 'AVERAGE', 'HISTORY', 'DVIDEO', 'EVENT A',
- 'EVENT B', 'TRIGGER']
- zones = {}
- for zname in zone_names:
- zname2, pos, length = f.read_f('8sII')
- zones[zname] = zname2, pos, length
- assert zname == zname2.decode('ascii').strip(' ')
- # raw signals memmap
- sig_dtype = 'u' + str(Bytes)
- self._raw_signals = np.memmap(self.filename, dtype=sig_dtype, mode='r',
- offset=Data_Start_Offset).reshape(-1, Num_Chan)
- # Reading Code Info
- zname2, pos, length = zones['ORDER']
- f.seek(pos)
- code = np.frombuffer(f.read(Num_Chan * 2), dtype='u2')
- units_code = {-1: 'nV', 0: 'uV', 1: 'mV', 2: 1, 100: 'percent',
- 101: 'dimensionless', 102: 'dimensionless'}
- sig_channels = []
- sig_grounds = []
- for c in range(Num_Chan):
- zname2, pos, length = zones['LABCOD']
- f.seek(pos + code[c] * 128 + 2, 0)
- chan_name = f.read(6).strip(b"\x00").decode('ascii')
- ground = f.read(6).strip(b"\x00").decode('ascii')
- sig_grounds.append(ground)
- logical_min, logical_max, logical_ground, physical_min, physical_max = f.read_f(
- 'iiiii')
- k, = f.read_f('h')
- units = units_code.get(k, 'uV')
- factor = float(physical_max - physical_min) / float(
- logical_max - logical_min + 1)
- gain = factor
- offset = -logical_ground * factor
- f.seek(8, 1)
- sampling_rate, = f.read_f('H')
- sampling_rate *= Rate_Min
- chan_id = c
- group_id = 0
- sig_channels.append((chan_name, chan_id, sampling_rate, sig_dtype,
- units, gain, offset, group_id))
- sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
- assert np.unique(sig_channels['sampling_rate']).size == 1
- self._sampling_rate = float(np.unique(sig_channels['sampling_rate'])[0])
- # Event channels
- event_channels = []
- event_channels.append(('Trigger', '', 'event'))
- event_channels.append(('Note', '', 'event'))
- event_channels.append(('Event A', '', 'epoch'))
- event_channels.append(('Event B', '', 'epoch'))
- event_channels = np.array(event_channels, dtype=_event_channel_dtype)
- # Read trigger and notes
- self._raw_events = []
- ev_dtypes = [('TRIGGER', [('start', 'u4'), ('label', 'u2')]),
- ('NOTE', [('start', 'u4'), ('label', 'S40')]),
- ('EVENT A', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
- ('EVENT B', [('label', 'u4'), ('start', 'u4'), ('stop', 'u4')]),
- ]
- for zname, ev_dtype in ev_dtypes:
- zname2, pos, length = zones[zname]
- dtype = np.dtype(ev_dtype)
- rawevent = np.memmap(self.filename, dtype=dtype, mode='r',
- offset=pos, shape=length // dtype.itemsize)
- keep = (rawevent['start'] >= rawevent['start'][0]) & (
- rawevent['start'] < self._raw_signals.shape[0]) & (
- rawevent['start'] != 0)
- rawevent = rawevent[keep]
- self._raw_events.append(rawevent)
- # No spikes
- unit_channels = []
- unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
- # fille into header dict
- self.header = {}
- self.header['nb_block'] = 1
- self.header['nb_segment'] = [1]
- self.header['signal_channels'] = sig_channels
- self.header['unit_channels'] = unit_channels
- self.header['event_channels'] = event_channels
- # insert some annotation at some place
- self._generate_minimal_annotations()
- bl_annotations = self.raw_annotations['blocks'][0]
- seg_annotations = bl_annotations['segments'][0]
- for d in (bl_annotations, seg_annotations):
- d['rec_datetime'] = rec_datetime
- d['firstname'] = firstname
- d['surname'] = surname
- d['header_version'] = header_version
- for c in range(sig_channels.size):
- anasig_an = seg_annotations['signals'][c]
- anasig_an['ground'] = sig_grounds[c]
- channel_an = self.raw_annotations['signal_channels'][c]
- channel_an['ground'] = sig_grounds[c]
- def _source_name(self):
- return self.filename
- def _segment_t_start(self, block_index, seg_index):
- return 0.
- def _segment_t_stop(self, block_index, seg_index):
- t_stop = self._raw_signals.shape[0] / self._sampling_rate
- return t_stop
- def _get_signal_size(self, block_index, seg_index, channel_indexes):
- return self._raw_signals.shape[0]
- def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
- return 0.
- def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
- if channel_indexes is None:
- channel_indexes = slice(channel_indexes)
- raw_signals = self._raw_signals[slice(i_start, i_stop), channel_indexes]
- return raw_signals
- def _spike_count(self, block_index, seg_index, unit_index):
- return 0
- def _event_count(self, block_index, seg_index, event_channel_index):
- n = self._raw_events[event_channel_index].size
- return n
- def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
- raw_event = self._raw_events[event_channel_index]
- if t_start is not None:
- keep = raw_event['start'] >= int(t_start * self._sampling_rate)
- raw_event = raw_event[keep]
- if t_stop is not None:
- keep = raw_event['start'] <= int(t_stop * self._sampling_rate)
- raw_event = raw_event[keep]
- timestamp = raw_event['start']
- if event_channel_index < 2:
- durations = None
- else:
- durations = raw_event['stop'] - raw_event['start']
- try:
- labels = raw_event['label'].astype('U')
- except UnicodeDecodeError:
- # sometimes the conversion do not work : here a simple fix
- labels = np.array([e.decode('cp1252') for e in raw_event['label']], dtype='U')
- return timestamp, durations, labels
- def _rescale_event_timestamp(self, event_timestamps, dtype):
- event_times = event_timestamps.astype(dtype) / self._sampling_rate
- return event_times
- def _rescale_epoch_duration(self, raw_duration, dtype):
- durations = raw_duration.astype(dtype) // self._sampling_rate
- return durations
|