Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

spike2rawio.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. # -*- coding: utf-8 -*-
  2. """
  3. Classe for reading data in CED spike2 files (.smr).
  4. This code is based on:
  5. - sonpy, written by Antonio Gonzalez <Antonio.Gonzalez@cantab.net>
  6. Disponible here ::
  7. http://www.neuro.ki.se/broberger/
  8. and sonpy come from :
  9. - SON Library 2.0 for MATLAB, written by Malcolm Lidierth at
  10. King's College London.
  11. See http://www.kcl.ac.uk/depsta/biomedical/cfnr/lidierth.html
  12. This IO support old (<v6) and new files (>v7) of spike2
  13. Author: Samuel Garcia
  14. """
  15. from __future__ import print_function, division, absolute_import
  16. # from __future__ import unicode_literals is not compatible with numpy.dtype both py2 py3
  17. from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
  18. _event_channel_dtype)
  19. import numpy as np
  20. from collections import OrderedDict
  21. class Spike2RawIO(BaseRawIO):
  22. """
  23. """
  24. extensions = ['smr']
  25. rawmode = 'one-file'
  26. def __init__(self, filename='', take_ideal_sampling_rate=False, ced_units=True):
  27. BaseRawIO.__init__(self)
  28. self.filename = filename
  29. self.take_ideal_sampling_rate = take_ideal_sampling_rate
  30. self.ced_units = ced_units
  31. def _parse_header(self):
  32. # get header info and channel_info
  33. with open(self.filename, 'rb') as fid:
  34. self._global_info = read_as_dict(fid, headerDescription)
  35. info = self._global_info
  36. if info['system_id'] < 6:
  37. info['dtime_base'] = 1e-6
  38. info['datetime_detail'] = 0
  39. info['datetime_year'] = 0
  40. self._time_factor = info['us_per_time'] * info['dtime_base']
  41. self._channel_infos = []
  42. for chan_id in range(info['channels']):
  43. fid.seek(512 + 140 * chan_id)
  44. chan_info = read_as_dict(fid, channelHeaderDesciption1)
  45. if chan_info['kind'] in [1, 6]:
  46. dt = [('scale', 'f4'), ('offset', 'f4'), ('unit', 'S6'), ]
  47. chan_info.update(read_as_dict(fid, dt))
  48. elif chan_info['kind'] in [7, 9]:
  49. dt = [('min', 'f4'), ('max', 'f4'), ('unit', 'S6'), ]
  50. chan_info.update(read_as_dict(fid, dt))
  51. elif chan_info['kind'] in [4]:
  52. dt = [('init_low', 'u1'), ('next_low', 'u1'), ]
  53. chan_info.update(read_as_dict(fid, dt))
  54. if chan_info['kind'] in [1, 6, 7, 9]:
  55. if info['system_id'] < 6:
  56. chan_info.update(read_as_dict(fid, [('divide', 'i2')]))
  57. else:
  58. chan_info.update(read_as_dict(fid, [('interleave', 'i2')]))
  59. chan_info['type'] = dict_kind[chan_info['kind']]
  60. if chan_info['blocks'] == 0:
  61. chan_info['t_start'] = 0. # this means empty signals
  62. else:
  63. fid.seek(chan_info['firstblock'])
  64. block_info = read_as_dict(fid, blockHeaderDesciption)
  65. chan_info['t_start'] = float(block_info['start_time']) * \
  66. float(info['us_per_time']) * float(info['dtime_base'])
  67. self._channel_infos.append(chan_info)
  68. # get data blocks index for all channel
  69. # run through all data block of of channel to prepare chan to block maps
  70. self._memmap = np.memmap(self.filename, dtype='u1', offset=0, mode='r')
  71. self._all_data_blocks = {}
  72. self._by_seg_data_blocks = {}
  73. for chan_id, chan_info in enumerate(self._channel_infos):
  74. data_blocks = []
  75. ind = chan_info['firstblock']
  76. for b in range(chan_info['blocks']):
  77. block_info = self._memmap[ind:ind + 20].view(blockHeaderDesciption)[0]
  78. data_blocks.append((ind, block_info['items'], 0,
  79. block_info['start_time'], block_info['end_time']))
  80. ind = block_info['succ_block']
  81. data_blocks = np.array(data_blocks, dtype=[(
  82. 'pos', 'int32'), ('size', 'int32'), ('cumsum', 'int32'),
  83. ('start_time', 'int32'), ('end_time', 'int32')])
  84. data_blocks['pos'] += 20 # 20 is ths header size
  85. self._all_data_blocks[chan_id] = data_blocks
  86. self._by_seg_data_blocks[chan_id] = []
  87. # For all signal channel detect gaps between data block (pause in rec) so new Segment.
  88. # then check that all channel have the same gaps.
  89. # this part is tricky because we need to check that all channel have same pause.
  90. all_gaps_block_ind = {}
  91. for chan_id, chan_info in enumerate(self._channel_infos):
  92. if chan_info['kind'] in [1, 9]:
  93. data_blocks = self._all_data_blocks[chan_id]
  94. sig_size = np.sum(self._all_data_blocks[chan_id]['size'])
  95. if sig_size > 0:
  96. interval = get_sample_interval(info, chan_info) / self._time_factor
  97. # detect gaps
  98. inter_block_sizes = data_blocks['start_time'][1:] - \
  99. data_blocks['end_time'][:-1]
  100. gaps_block_ind, = np.nonzero(inter_block_sizes > interval)
  101. all_gaps_block_ind[chan_id] = gaps_block_ind
  102. # find t_start/t_stop for each seg based on gaps indexe
  103. self._sig_t_starts = {}
  104. self._sig_t_stops = {}
  105. if len(all_gaps_block_ind) == 0:
  106. # this means no signal channels
  107. nb_segment = 1
  108. # loop over event/spike channel to get the min/max time
  109. t_start, t_stop = None, None
  110. for chan_id, chan_info in enumerate(self._channel_infos):
  111. data_blocks = self._all_data_blocks[chan_id]
  112. if data_blocks.size > 0:
  113. # if t_start is None or data_blocks[0]['start_time']<t_start:
  114. # t_start = data_blocks[0]['start_time']
  115. if t_stop is None or data_blocks[-1]['end_time'] > t_stop:
  116. t_stop = data_blocks[-1]['end_time']
  117. # self._seg_t_starts = [t_start]
  118. self._seg_t_starts = [0]
  119. self._seg_t_stops = [t_stop]
  120. else:
  121. all_nb_seg = np.array([v.size + 1 for v in all_gaps_block_ind.values()])
  122. assert np.all(all_nb_seg[0] == all_nb_seg), \
  123. 'Signal channel have differents pause so diffrents nb_segment'
  124. nb_segment = int(all_nb_seg[0])
  125. for chan_id, gaps_block_ind in all_gaps_block_ind.items():
  126. data_blocks = self._all_data_blocks[chan_id]
  127. self._sig_t_starts[chan_id] = []
  128. self._sig_t_stops[chan_id] = []
  129. for seg_ind in range(nb_segment):
  130. if seg_ind == 0:
  131. fisrt_bl = 0
  132. else:
  133. fisrt_bl = gaps_block_ind[seg_ind - 1] + 1
  134. self._sig_t_starts[chan_id].append(data_blocks[fisrt_bl]['start_time'])
  135. if seg_ind < nb_segment - 1:
  136. last_bl = gaps_block_ind[seg_ind]
  137. else:
  138. last_bl = data_blocks.size - 1
  139. self._sig_t_stops[chan_id].append(data_blocks[last_bl]['end_time'])
  140. in_seg_data_block = data_blocks[fisrt_bl:last_bl + 1]
  141. in_seg_data_block['cumsum'][1:] = np.cumsum(in_seg_data_block['size'][:-1])
  142. self._by_seg_data_blocks[chan_id].append(in_seg_data_block)
  143. self._seg_t_starts = []
  144. self._seg_t_stops = []
  145. for seg_ind in range(nb_segment):
  146. # there is a small delay between all channel so take the max/min for t_start/t_stop
  147. t_start = min(
  148. self._sig_t_starts[chan_id][seg_ind] for chan_id in self._sig_t_starts)
  149. t_stop = max(self._sig_t_stops[chan_id][seg_ind] for chan_id in self._sig_t_stops)
  150. self._seg_t_starts.append(t_start)
  151. self._seg_t_stops.append(t_stop)
  152. # create typed channels
  153. sig_channels = []
  154. unit_channels = []
  155. event_channels = []
  156. self.internal_unit_ids = {}
  157. for chan_id, chan_info in enumerate(self._channel_infos):
  158. if chan_info['kind'] in [1, 6, 7, 9]:
  159. if self.take_ideal_sampling_rate:
  160. sampling_rate = info['ideal_rate']
  161. else:
  162. sample_interval = get_sample_interval(info, chan_info)
  163. sampling_rate = (1. / sample_interval)
  164. name = chan_info['title']
  165. if chan_info['kind'] in [1, 9]:
  166. # AnalogSignal
  167. if chan_id not in self._sig_t_starts:
  168. continue
  169. units = chan_info['unit']
  170. if chan_info['kind'] == 1: # int16
  171. gain = chan_info['scale'] / 6553.6
  172. offset = chan_info['offset']
  173. sig_dtype = 'int16'
  174. elif chan_info['kind'] == 9: # float32
  175. gain = 1.
  176. offset = 0.
  177. sig_dtype = 'int32'
  178. group_id = 0
  179. sig_channels.append((name, chan_id, sampling_rate, sig_dtype,
  180. units, gain, offset, group_id))
  181. elif chan_info['kind'] in [2, 3, 4, 5, 8]:
  182. # Event
  183. event_channels.append((name, chan_id, 'event'))
  184. elif chan_info['kind'] in [6, 7]: # SpikeTrain with waveforms
  185. wf_units = chan_info['unit']
  186. if chan_info['kind'] == 6:
  187. wf_gain = chan_info['scale'] / 6553.6
  188. wf_offset = chan_info['offset']
  189. wf_left_sweep = chan_info['n_extra'] // 4
  190. elif chan_info['kind'] == 7:
  191. wf_gain = 1.
  192. wf_offset = 0.
  193. wf_left_sweep = chan_info['n_extra'] // 8
  194. wf_sampling_rate = sampling_rate
  195. if self.ced_units:
  196. # this is a hudge pain because need
  197. # to jump over all blocks
  198. data_blocks = self._all_data_blocks[chan_id]
  199. dt = get_channel_dtype(chan_info)
  200. unit_ids = set()
  201. for bl in range(data_blocks.size):
  202. ind0 = data_blocks[bl]['pos']
  203. ind1 = data_blocks[bl]['size'] * dt.itemsize + ind0
  204. raw_data = self._memmap[ind0:ind1].view(dt)
  205. marker = raw_data['marker'] & 255
  206. unit_ids.update(np.unique(marker))
  207. unit_ids = sorted(list(unit_ids))
  208. else:
  209. # All spike from one channel are group in one SpikeTrain
  210. unit_ids = ['all']
  211. for unit_id in unit_ids:
  212. unit_index = len(unit_channels)
  213. self.internal_unit_ids[unit_index] = (chan_id, unit_id)
  214. _id = "ch{}#{}".format(chan_id, unit_id)
  215. unit_channels.append((name, _id, wf_units, wf_gain, wf_offset,
  216. wf_left_sweep, wf_sampling_rate))
  217. sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)
  218. unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)
  219. event_channels = np.array(event_channels, dtype=_event_channel_dtype)
  220. if len(sig_channels) > 0:
  221. # signal channel can different sampling_rate/dtype/t_start/signal_length...
  222. # grouping them is difficults, so each channe = one group
  223. sig_channels['group_id'] = np.arange(sig_channels.size)
  224. self._sig_dtypes = {s['group_id']: np.dtype(s['dtype']) for s in sig_channels}
  225. # fille into header dict
  226. self.header = {}
  227. self.header['nb_block'] = 1
  228. self.header['nb_segment'] = [nb_segment]
  229. self.header['signal_channels'] = sig_channels
  230. self.header['unit_channels'] = unit_channels
  231. self.header['event_channels'] = event_channels
  232. # Annotations
  233. self._generate_minimal_annotations()
  234. bl_ann = self.raw_annotations['blocks'][0]
  235. bl_ann['system_id'] = info['system_id']
  236. seg_ann = bl_ann['segments'][0]
  237. seg_ann['system_id'] = info['system_id']
  238. for c, sig_channel in enumerate(sig_channels):
  239. chan_id = sig_channel['id']
  240. anasig_an = seg_ann['signals'][c]
  241. anasig_an['physical_channel_index'] = self._channel_infos[chan_id]['phy_chan']
  242. anasig_an['comment'] = self._channel_infos[chan_id]['comment']
  243. for c, unit_channel in enumerate(unit_channels):
  244. chan_id, unit_id = self.internal_unit_ids[c]
  245. unit_an = seg_ann['units'][c]
  246. unit_an['physical_channel_index'] = self._channel_infos[chan_id]['phy_chan']
  247. unit_an['comment'] = self._channel_infos[chan_id]['comment']
  248. for c, event_channel in enumerate(event_channels):
  249. chan_id = int(event_channel['id'])
  250. ev_an = seg_ann['events'][c]
  251. ev_an['physical_channel_index'] = self._channel_infos[chan_id]['phy_chan']
  252. ev_an['comment'] = self._channel_infos[chan_id]['comment']
  253. def _source_name(self):
  254. return self.filename
  255. def _segment_t_start(self, block_index, seg_index):
  256. return self._seg_t_starts[seg_index] * self._time_factor
  257. def _segment_t_stop(self, block_index, seg_index):
  258. return self._seg_t_stops[seg_index] * self._time_factor
  259. def _check_channel_indexes(self, channel_indexes):
  260. if channel_indexes is None:
  261. channel_indexes = slice(None)
  262. channel_indexes = np.arange(self.header['signal_channels'].size)[channel_indexes]
  263. assert len(channel_indexes) == 1
  264. return channel_indexes
  265. def _get_signal_size(self, block_index, seg_index, channel_indexes):
  266. channel_indexes = self._check_channel_indexes(channel_indexes)
  267. chan_id = self.header['signal_channels'][channel_indexes[0]]['id']
  268. sig_size = np.sum(self._by_seg_data_blocks[chan_id][seg_index]['size'])
  269. return sig_size
  270. def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
  271. channel_indexes = self._check_channel_indexes(channel_indexes)
  272. chan_id = self.header['signal_channels'][channel_indexes[0]]['id']
  273. return self._sig_t_starts[chan_id][seg_index] * self._time_factor
  274. def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
  275. if i_start is None:
  276. i_start = 0
  277. if i_stop is None:
  278. i_stop = self._get_signal_size(block_index, seg_index, channel_indexes)
  279. channel_indexes = self._check_channel_indexes(channel_indexes)
  280. chan_index = channel_indexes[0]
  281. chan_id = self.header['signal_channels'][chan_index]['id']
  282. group_id = self.header['signal_channels'][channel_indexes[0]]['group_id']
  283. dt = self._sig_dtypes[group_id]
  284. raw_signals = np.zeros((i_stop - i_start, len(channel_indexes)), dtype=dt)
  285. for c, channel_index in enumerate(channel_indexes):
  286. # NOTE: this actual way is slow because we run throught
  287. # the file for each channel. The loop should be reversed.
  288. # But there is no garanty that channels shared the same data block
  289. # indexes. So this make the job too difficult.
  290. chan_header = self.header['signal_channels'][channel_index]
  291. chan_id = chan_header['id']
  292. data_blocks = self._by_seg_data_blocks[chan_id][seg_index]
  293. # loop over data blocks and get chunks
  294. bl0 = np.searchsorted(data_blocks['cumsum'], i_start, side='left')
  295. bl1 = np.searchsorted(data_blocks['cumsum'], i_stop, side='left')
  296. ind = 0
  297. for bl in range(bl0, bl1):
  298. ind0 = data_blocks[bl]['pos']
  299. ind1 = data_blocks[bl]['size'] * dt.itemsize + ind0
  300. data = self._memmap[ind0:ind1].view(dt)
  301. if bl == bl1 - 1:
  302. # right border
  303. # be carfull that bl could be both bl0 and bl1!!
  304. border = data.size - (i_stop - data_blocks[bl]['cumsum'])
  305. if border > 0:
  306. data = data[:-border]
  307. if bl == bl0:
  308. # left border
  309. border = i_start - data_blocks[bl]['cumsum']
  310. data = data[border:]
  311. raw_signals[ind:data.size + ind, c] = data
  312. ind += data.size
  313. return raw_signals
  314. def _count_in_time_slice(self, seg_index, chan_id, lim0, lim1, marker_filter=None):
  315. # count event or spike in time slice
  316. data_blocks = self._all_data_blocks[chan_id]
  317. chan_info = self._channel_infos[chan_id]
  318. dt = get_channel_dtype(chan_info)
  319. nb = 0
  320. for bl in range(data_blocks.size):
  321. ind0 = data_blocks[bl]['pos']
  322. ind1 = data_blocks[bl]['size'] * dt.itemsize + ind0
  323. raw_data = self._memmap[ind0:ind1].view(dt)
  324. ts = raw_data['tick']
  325. keep = (ts >= lim0) & (ts <= lim1)
  326. if marker_filter is not None:
  327. keep2 = (raw_data['marker'] & 255) == marker_filter
  328. keep = keep & keep2
  329. nb += np.sum(keep)
  330. if ts[-1] > lim1:
  331. break
  332. return nb
  333. def _get_internal_timestamp_(self, seg_index, chan_id,
  334. t_start, t_stop, other_field=None, marker_filter=None):
  335. chan_info = self._channel_infos[chan_id]
  336. # data_blocks = self._by_seg_data_blocks[chan_id][seg_index]
  337. data_blocks = self._all_data_blocks[chan_id]
  338. dt = get_channel_dtype(chan_info)
  339. if t_start is None:
  340. # lim0 = 0
  341. lim0 = self._seg_t_starts[seg_index]
  342. else:
  343. lim0 = int(t_start / self._time_factor)
  344. if t_stop is None:
  345. # lim1 = 2**32
  346. lim1 = self._seg_t_stops[seg_index]
  347. else:
  348. lim1 = int(t_stop / self._time_factor)
  349. timestamps = []
  350. othervalues = []
  351. for bl in range(data_blocks.size):
  352. ind0 = data_blocks[bl]['pos']
  353. ind1 = data_blocks[bl]['size'] * dt.itemsize + ind0
  354. raw_data = self._memmap[ind0:ind1].view(dt)
  355. ts = raw_data['tick']
  356. keep = (ts >= lim0) & (ts <= lim1)
  357. if marker_filter is not None:
  358. keep2 = (raw_data['marker'] & 255) == marker_filter
  359. keep = keep & keep2
  360. timestamps.append(ts[keep])
  361. if other_field is not None:
  362. othervalues.append(raw_data[other_field][keep])
  363. if ts[-1] > lim1:
  364. break
  365. if len(timestamps) > 0:
  366. timestamps = np.concatenate(timestamps)
  367. else:
  368. timestamps = np.zeros(0, dtype='int16')
  369. if other_field is None:
  370. return timestamps
  371. else:
  372. if len(timestamps) > 0:
  373. othervalues = np.concatenate(othervalues)
  374. else:
  375. othervalues = np.zeros(0, dtype=dt.fields[other_field][0])
  376. return timestamps, othervalues
  377. def _spike_count(self, block_index, seg_index, unit_index):
  378. chan_id, unit_id = self.internal_unit_ids[unit_index]
  379. if self.ced_units:
  380. marker_filter = unit_id
  381. else:
  382. marker_filter = None
  383. lim0 = self._seg_t_starts[seg_index]
  384. lim1 = self._seg_t_stops[seg_index]
  385. return self._count_in_time_slice(seg_index, chan_id,
  386. lim0, lim1, marker_filter=marker_filter)
  387. def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop):
  388. unit_header = self.header['unit_channels'][unit_index]
  389. chan_id, unit_id = self.internal_unit_ids[unit_index]
  390. if self.ced_units:
  391. marker_filter = unit_id
  392. else:
  393. marker_filter = None
  394. spike_timestamps = self._get_internal_timestamp_(seg_index,
  395. chan_id, t_start, t_stop,
  396. marker_filter=marker_filter)
  397. return spike_timestamps
  398. def _rescale_spike_timestamp(self, spike_timestamps, dtype):
  399. spike_times = spike_timestamps.astype(dtype)
  400. spike_times *= self._time_factor
  401. return spike_times
  402. def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, t_start, t_stop):
  403. unit_header = self.header['unit_channels'][unit_index]
  404. chan_id, unit_id = self.internal_unit_ids[unit_index]
  405. if self.ced_units:
  406. marker_filter = unit_id
  407. else:
  408. marker_filter = None
  409. timestamps, waveforms = self._get_internal_timestamp_(seg_index, chan_id,
  410. t_start, t_stop,
  411. other_field='waveform',
  412. marker_filter=marker_filter)
  413. waveforms = waveforms.reshape(timestamps.size, 1, -1)
  414. return waveforms
  415. def _event_count(self, block_index, seg_index, event_channel_index):
  416. event_header = self.header['event_channels'][event_channel_index]
  417. chan_id = int(event_header['id']) # because set to string in header
  418. lim0 = self._seg_t_starts[seg_index]
  419. lim1 = self._seg_t_stops[seg_index]
  420. return self._count_in_time_slice(seg_index, chan_id, lim0, lim1, marker_filter=None)
  421. def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
  422. event_header = self.header['event_channels'][event_channel_index]
  423. chan_id = int(event_header['id']) # because set to string in header
  424. chan_info = self._channel_infos[chan_id]
  425. if chan_info['kind'] == 5:
  426. timestamps, labels = self._get_internal_timestamp_(seg_index,
  427. chan_id, t_start, t_stop,
  428. other_field='marker')
  429. elif chan_info['kind'] == 8:
  430. timestamps, labels = self._get_internal_timestamp_(seg_index,
  431. chan_id, t_start, t_stop,
  432. other_field='label')
  433. else:
  434. timestamps = self._get_internal_timestamp_(seg_index,
  435. chan_id, t_start, t_stop, other_field=None)
  436. labels = np.zeros(timestamps.size, dtype='U')
  437. labels = labels.astype('U')
  438. durations = None
  439. return timestamps, durations, labels
  440. def _rescale_event_timestamp(self, event_timestamps, dtype):
  441. event_times = event_timestamps.astype(dtype)
  442. event_times *= self._time_factor
  443. return event_times
  444. def read_as_dict(fid, dtype):
  445. """
  446. Given a file descriptor (seek at the good place externally)
  447. and a numpy.dtype of the binary struct return a dict.
  448. Make conversion for strings.
  449. """
  450. dt = np.dtype(dtype)
  451. h = np.frombuffer(fid.read(dt.itemsize), dt)[0]
  452. info = OrderedDict()
  453. for k in dt.names:
  454. v = h[k]
  455. if dt[k].kind == 'S':
  456. v = v.decode('iso-8859-1')
  457. if len(v) > 0:
  458. l = ord(v[0])
  459. v = v[1:l + 1]
  460. info[k] = v
  461. return info
  462. def get_channel_dtype(chan_info):
  463. """
  464. Get dtype by kind.
  465. """
  466. if chan_info['kind'] == 1: # Raw signal
  467. dt = 'int16'
  468. elif chan_info['kind'] in [2, 3, 4]: # Event data
  469. dt = [('tick', 'i4')]
  470. elif chan_info['kind'] in [5]: # Marker data
  471. dt = [('tick', 'i4'), ('marker', 'i4')]
  472. elif chan_info['kind'] in [6]: # AdcMark data (waveform)
  473. dt = [('tick', 'i4'), ('marker', 'i4'),
  474. # ('adc', 'S%d' % chan_info['n_extra'])]
  475. ('waveform', 'int16', chan_info['n_extra'] // 2)]
  476. elif chan_info['kind'] in [7]: # RealMark data (waveform)
  477. dt = [('tick', 'i4'), ('marker', 'i4'),
  478. # ('real', 'S%d' % chan_info['n_extra'])]
  479. ('waveform', 'float32', chan_info['n_extra'] // 4)]
  480. elif chan_info['kind'] in [8]: # TextMark data
  481. dt = [('tick', 'i4'), ('marker', 'i4'),
  482. ('label', 'S%d' % chan_info['n_extra'])]
  483. elif chan_info['kind'] == 9: # Float signal
  484. dt = 'float32'
  485. dt = np.dtype(dt)
  486. return dt
  487. def get_sample_interval(info, chan_info):
  488. """
  489. Get sample interval for one channel
  490. """
  491. if info['system_id'] in [1, 2, 3, 4, 5]: # Before version 5
  492. sample_interval = (chan_info['divide'] * info['us_per_time'] *
  493. info['time_per_adc']) * 1e-6
  494. else:
  495. sample_interval = (chan_info['l_chan_dvd'] *
  496. info['us_per_time'] * info['dtime_base'])
  497. return sample_interval
  498. # headers structures :
  499. headerDescription = [
  500. ('system_id', 'i2'),
  501. ('copyright', 'S10'),
  502. ('creator', 'S8'),
  503. ('us_per_time', 'i2'),
  504. ('time_per_adc', 'i2'),
  505. ('filestate', 'i2'),
  506. ('first_data', 'i4'), # i8
  507. ('channels', 'i2'),
  508. ('chan_size', 'i2'),
  509. ('extra_data', 'i2'),
  510. ('buffersize', 'i2'),
  511. ('os_format', 'i2'),
  512. ('max_ftime', 'i4'), # i8
  513. ('dtime_base', 'f8'),
  514. ('datetime_detail', 'u1'),
  515. ('datetime_year', 'i2'),
  516. ('pad', 'S52'),
  517. ('comment1', 'S80'),
  518. ('comment2', 'S80'),
  519. ('comment3', 'S80'),
  520. ('comment4', 'S80'),
  521. ('comment5', 'S80'),
  522. ]
  523. channelHeaderDesciption1 = [
  524. ('del_size', 'i2'),
  525. ('next_del_block', 'i4'), # i8
  526. ('firstblock', 'i4'), # i8
  527. ('lastblock', 'i4'), # i8
  528. ('blocks', 'i2'),
  529. ('n_extra', 'i2'),
  530. ('pre_trig', 'i2'),
  531. ('free0', 'i2'),
  532. ('py_sz', 'i2'),
  533. ('max_data', 'i2'),
  534. ('comment', 'S72'),
  535. ('max_chan_time', 'i4'), # i8
  536. ('l_chan_dvd', 'i4'), # i8
  537. ('phy_chan', 'i2'),
  538. ('title', 'S10'),
  539. ('ideal_rate', 'f4'),
  540. ('kind', 'u1'),
  541. ('unused1', 'i1'),
  542. ]
  543. blockHeaderDesciption = [
  544. ('pred_block', 'i4'), # i8
  545. ('succ_block', 'i4'), # i8
  546. ('start_time', 'i4'), # i8
  547. ('end_time', 'i4'), # i8
  548. ('channel_num', 'i2'),
  549. ('items', 'i2'),
  550. ]
  551. dict_kind = {
  552. 0: 'empty',
  553. 1: 'Adc',
  554. 2: 'EventFall',
  555. 3: 'EventRise',
  556. 4: 'EventBoth',
  557. 5: 'Marker',
  558. 6: 'AdcMark',
  559. 7: 'RealMark',
  560. 8: 'TextMark',
  561. 9: 'RealWave',
  562. }