micromedio.py 7.2 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data from micromed (.trc).
  4. Inspired by the Matlab code for EEGLAB from Rami K. Niazy.
  5. Completed with matlab Guillaume BECQ code.
  6. Supported : Read
  7. Author: sgarcia
  8. """
  9. import datetime
  10. import os
  11. import struct
  12. # file no longer exists in Python3
  13. try:
  14. file
  15. except NameError:
  16. import io
  17. file = io.BufferedReader
  18. import numpy as np
  19. import quantities as pq
  20. from neo.io.baseio import BaseIO
  21. from neo.core import Segment, AnalogSignal, Epoch, Event
  22. class StructFile(file):
  23. def read_f(self, fmt):
  24. return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
  25. class MicromedIO(BaseIO):
  26. """
  27. Class for reading data from micromed (.trc).
  28. Usage:
  29. >>> from neo import io
  30. >>> r = io.MicromedIO(filename='File_micromed_1.TRC')
  31. >>> seg = r.read_segment(lazy=False, cascade=True)
  32. >>> print seg.analogsignals # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  33. [<AnalogSignal(array([ -1.77246094e+02, -2.24707031e+02,
  34. -2.66015625e+02, ...
  35. """
  36. is_readable = True
  37. is_writable = False
  38. supported_objects = [Segment, AnalogSignal, Event, Epoch]
  39. readable_objects = [Segment]
  40. writeable_objects = []
  41. has_header = False
  42. is_streameable = False
  43. read_params = {Segment: []}
  44. write_params = None
  45. name = None
  46. extensions = ['TRC']
  47. mode = 'file'
  48. def __init__(self, filename=None):
  49. """
  50. This class read a micromed TRC file.
  51. Arguments:
  52. filename : the filename to read
  53. """
  54. BaseIO.__init__(self)
  55. self.filename = filename
  56. def read_segment(self, cascade=True, lazy=False, ):
  57. """
  58. Arguments:
  59. """
  60. f = StructFile(self.filename, 'rb')
  61. # Name
  62. f.seek(64, 0)
  63. surname = f.read(22)
  64. while surname[-1] == ' ':
  65. if len(surname) == 0:
  66. break
  67. surname = surname[:-1]
  68. firstname = f.read(20)
  69. while firstname[-1] == ' ':
  70. if len(firstname) == 0:
  71. break
  72. firstname = firstname[:-1]
  73. #Date
  74. f.seek(128, 0)
  75. day, month, year, hour, minute, sec = f.read_f('bbbbbb')
  76. rec_datetime = datetime.datetime(year + 1900, month, day, hour, minute,
  77. sec)
  78. f.seek(138, 0)
  79. Data_Start_Offset, Num_Chan, Multiplexer, Rate_Min, Bytes = f.read_f(
  80. 'IHHHH')
  81. #~ print Num_Chan, Bytes
  82. #header version
  83. f.seek(175, 0)
  84. header_version, = f.read_f('b')
  85. assert header_version == 4
  86. seg = Segment(name=firstname + ' ' + surname,
  87. file_origin=os.path.basename(self.filename))
  88. seg.annotate(surname=surname)
  89. seg.annotate(firstname=firstname)
  90. seg.annotate(rec_datetime=rec_datetime)
  91. if not cascade:
  92. return seg
  93. # area
  94. f.seek(176, 0)
  95. zone_names = ['ORDER', 'LABCOD', 'NOTE', 'FLAGS', 'TRONCA', 'IMPED_B',
  96. 'IMPED_E', 'MONTAGE',
  97. 'COMPRESS', 'AVERAGE', 'HISTORY', 'DVIDEO', 'EVENT A',
  98. 'EVENT B', 'TRIGGER']
  99. zones = {}
  100. for zname in zone_names:
  101. zname2, pos, length = f.read_f('8sII')
  102. zones[zname] = zname2, pos, length
  103. #~ print zname2, pos, length
  104. # reading raw data
  105. if not lazy:
  106. f.seek(Data_Start_Offset, 0)
  107. rawdata = np.fromstring(f.read(), dtype='u' + str(Bytes))
  108. rawdata = rawdata.reshape((rawdata.size / Num_Chan, Num_Chan))
  109. # Reading Code Info
  110. zname2, pos, length = zones['ORDER']
  111. f.seek(pos, 0)
  112. code = np.fromfile(f, dtype='u2', count=Num_Chan)
  113. units = {-1: pq.nano * pq.V, 0: pq.uV, 1: pq.mV, 2: 1, 100: pq.percent,
  114. 101: pq.dimensionless, 102: pq.dimensionless}
  115. for c in range(Num_Chan):
  116. zname2, pos, length = zones['LABCOD']
  117. f.seek(pos + code[c] * 128 + 2, 0)
  118. label = f.read(6).strip("\x00")
  119. ground = f.read(6).strip("\x00")
  120. (logical_min, logical_max, logical_ground, physical_min,
  121. physical_max) = f.read_f('iiiii')
  122. k, = f.read_f('h')
  123. if k in units.keys():
  124. unit = units[k]
  125. else:
  126. unit = pq.uV
  127. f.seek(8, 1)
  128. sampling_rate, = f.read_f('H') * pq.Hz
  129. sampling_rate *= Rate_Min
  130. if lazy:
  131. signal = [] * unit
  132. else:
  133. factor = float(physical_max - physical_min) / float(
  134. logical_max - logical_min + 1)
  135. signal = (rawdata[:, c].astype(
  136. 'f') - logical_ground) * factor * unit
  137. ana_sig = AnalogSignal(signal, sampling_rate=sampling_rate,
  138. name=label, channel_index=c)
  139. if lazy:
  140. ana_sig.lazy_shape = None
  141. ana_sig.annotate(ground=ground)
  142. seg.analogsignals.append(ana_sig)
  143. sampling_rate = np.mean(
  144. [ana_sig.sampling_rate for ana_sig in seg.analogsignals]) * pq.Hz
  145. # Read trigger and notes
  146. for zname, label_dtype in [('TRIGGER', 'u2'), ('NOTE', 'S40')]:
  147. zname2, pos, length = zones[zname]
  148. f.seek(pos, 0)
  149. triggers = np.fromstring(f.read(length), dtype=[('pos', 'u4'), (
  150. 'label', label_dtype)])
  151. if not lazy:
  152. keep = (triggers['pos'] >= triggers['pos'][0]) & (
  153. triggers['pos'] < rawdata.shape[0]) & (
  154. triggers['pos'] != 0)
  155. triggers = triggers[keep]
  156. ea = Event(name=zname[0] + zname[1:].lower(),
  157. labels=triggers['label'].astype('S'),
  158. times=(triggers['pos'] / sampling_rate).rescale('s'))
  159. else:
  160. ea = Event(name=zname[0] + zname[1:].lower())
  161. ea.lazy_shape = triggers.size
  162. seg.events.append(ea)
  163. # Read Event A and B
  164. # Not so well tested
  165. for zname in ['EVENT A', 'EVENT B']:
  166. zname2, pos, length = zones[zname]
  167. f.seek(pos, 0)
  168. epochs = np.fromstring(f.read(length),
  169. dtype=[('label', 'u4'), ('start', 'u4'),
  170. ('stop', 'u4'), ])
  171. ep = Epoch(name=zname[0] + zname[1:].lower())
  172. if not lazy:
  173. keep = (epochs['start'] > 0) & (
  174. epochs['start'] < rawdata.shape[0]) & (
  175. epochs['stop'] < rawdata.shape[0])
  176. epochs = epochs[keep]
  177. ep = Epoch(name=zname[0] + zname[1:].lower(),
  178. labels=epochs['label'].astype('S'),
  179. times=(epochs['start'] / sampling_rate).rescale('s'),
  180. durations=((epochs['stop'] - epochs['start']) / sampling_rate).rescale('s'))
  181. else:
  182. ep = Epoch(name=zname[0] + zname[1:].lower())
  183. ep.lazy_shape = triggers.size
  184. seg.epochs.append(ep)
  185. seg.create_many_to_one_relationship()
  186. return seg