test_neuralynxio.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.blackrockio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import os
  8. import sys
  9. import re
  10. import warnings
  11. try:
  12. import unittest2 as unittest
  13. except ImportError:
  14. import unittest
  15. import numpy as np
  16. import quantities as pq
  17. from neo import NeuralynxIO, AnalogSignal, SpikeTrain, Event
  18. from neo.test.iotest.common_io_test import BaseTestIO
  19. from neo.core import Segment
  20. class CommonTests(BaseTestIO):
  21. ioclass = NeuralynxIO
  22. files_to_test = []
  23. files_to_download = [
  24. 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
  25. 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
  26. 'Cheetah_v5.5.1/original_data/Events.nev',
  27. 'Cheetah_v5.5.1/original_data/STet3a.nse',
  28. 'Cheetah_v5.5.1/original_data/STet3b.nse',
  29. 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
  30. 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
  31. 'Cheetah_v5.5.1/plain_data/STet3a.txt',
  32. 'Cheetah_v5.5.1/plain_data/STet3b.txt',
  33. 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
  34. 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
  35. 'Cheetah_v5.5.1/plain_data/Events.txt',
  36. 'Cheetah_v5.5.1/README.txt',
  37. 'Cheetah_v5.7.4/original_data/CSC1.ncs',
  38. 'Cheetah_v5.7.4/original_data/CSC2.ncs',
  39. 'Cheetah_v5.7.4/original_data/CSC3.ncs',
  40. 'Cheetah_v5.7.4/original_data/CSC4.ncs',
  41. 'Cheetah_v5.7.4/original_data/CSC5.ncs',
  42. 'Cheetah_v5.7.4/original_data/Events.nev',
  43. 'Cheetah_v5.7.4/plain_data/CSC1.txt',
  44. 'Cheetah_v5.7.4/plain_data/CSC2.txt',
  45. 'Cheetah_v5.7.4/plain_data/CSC3.txt',
  46. 'Cheetah_v5.7.4/plain_data/CSC4.txt',
  47. 'Cheetah_v5.7.4/plain_data/CSC5.txt',
  48. 'Cheetah_v5.7.4/plain_data/Events.txt',
  49. 'Cheetah_v5.7.4/README.txt']
  50. def setUp(self):
  51. super(CommonTests, self).setUp()
  52. data_dir = os.path.join(self.local_test_dir,
  53. 'Cheetah_v{}'.format(self.cheetah_version))
  54. self.sn = os.path.join(data_dir, 'original_data')
  55. self.pd = os.path.join(data_dir, 'plain_data')
  56. if not os.path.exists(self.sn):
  57. raise unittest.SkipTest('data file does not exist:' + self.sn)
  58. class TestCheetah_v551(CommonTests, unittest.TestCase):
  59. cheetah_version = '5.5.1'
  60. def test_read_block(self):
  61. """Read data in a certain time range into one block"""
  62. t_start, t_stop = 3 * pq.s, 4 * pq.s
  63. nio = NeuralynxIO(self.sn, use_cache='never')
  64. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop])
  65. self.assertEqual(len(nio.parameters_ncs), 2)
  66. self.assertTrue(
  67. {'event_id': 11, 'name': 'Starting Recording', 'nttl': 0} in
  68. nio.parameters_nev['Events.nev']['event_types'])
  69. # Everything put in one segment
  70. self.assertEqual(len(block.segments), 1)
  71. seg = block.segments[0]
  72. self.assertEqual(len(seg.analogsignals), 1)
  73. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  74. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  75. pq.CompoundUnit('32*kHz'))
  76. self.assertEqual(seg.analogsignals[0].t_start, t_start)
  77. self.assertEqual(seg.analogsignals[0].t_stop, t_stop)
  78. self.assertEqual(len(seg.spiketrains), 2)
  79. # Testing different parameter combinations
  80. block = nio.read_block(lazy=True)
  81. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  82. self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
  83. block = nio.read_block(cascade=False)
  84. self.assertEqual(len(block.segments), 0)
  85. block = nio.read_block(electrode_list=[0])
  86. self.assertEqual(len(block.segments[0].analogsignals), 1)
  87. self.assertEqual(len(block.channel_indexes[-1].units), 1)
  88. block = nio.read_block(t_starts=None, t_stops=None, events=True,
  89. waveforms=True)
  90. self.assertEqual(len(block.segments[0].analogsignals), 1)
  91. self.assertEqual(len(block.segments[0].spiketrains), 2)
  92. self.assertEqual(len(block.segments[0].spiketrains[0].waveforms),
  93. len(block.segments[0].spiketrains[0]))
  94. self.assertGreater(len(block.segments[0].events), 0)
  95. self.assertEqual(len(block.channel_indexes[-1].units), 2)
  96. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop],
  97. unit_list=[0], electrode_list=[0])
  98. self.assertEqual(len(block.channel_indexes[-1].units), 1)
  99. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop],
  100. unit_list=False)
  101. self.assertEqual(len(block.channel_indexes[-1].units), 0)
  102. def test_read_segment(self):
  103. """Read data in a certain time range into one block"""
  104. nio = NeuralynxIO(self.sn, use_cache='never')
  105. seg = nio.read_segment(t_start=None, t_stop=None)
  106. self.assertEqual(len(seg.analogsignals), 1)
  107. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  108. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  109. pq.CompoundUnit('32*kHz'))
  110. self.assertEqual(len(seg.spiketrains), 2)
  111. # Testing different parameter combinations
  112. seg = nio.read_segment(lazy=True)
  113. self.assertEqual(len(seg.analogsignals[0]), 0)
  114. self.assertEqual(len(seg.spiketrains[0]), 0)
  115. seg = nio.read_segment(cascade=False)
  116. self.assertEqual(len(seg.analogsignals), 0)
  117. self.assertEqual(len(seg.spiketrains), 0)
  118. seg = nio.read_segment(electrode_list=[0])
  119. self.assertEqual(len(seg.analogsignals), 1)
  120. seg = nio.read_segment(t_start=None, t_stop=None, events=True,
  121. waveforms=True)
  122. self.assertEqual(len(seg.analogsignals), 1)
  123. self.assertEqual(len(seg.spiketrains), 2)
  124. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  125. self.assertTrue(len(seg.events) > 0)
  126. def test_read_ncs_data(self):
  127. t_start, t_stop = 0, 500 * 512 # in samples
  128. nio = NeuralynxIO(self.sn, use_cache='never')
  129. seg = Segment('testsegment')
  130. for el_id, el_dict in nio.parameters_ncs.items():
  131. filepath = nio.parameters_ncs[el_id]['recording_file_name']
  132. filename = filepath.split('/')[-1].split('\\')[-1].split('.')[0]
  133. nio.read_ncs(filename, seg, t_start=t_start, t_stop=t_stop)
  134. anasig = seg.filter({'electrode_id': el_id},
  135. objects=AnalogSignal)[0]
  136. target_data = np.zeros((16679, 512))
  137. with open(self.pd + '/%s.txt' % filename) as datafile:
  138. for i, line in enumerate(datafile):
  139. line = line.strip('\xef\xbb\xbf')
  140. entries = line.split()
  141. target_data[i, :] = np.asarray(entries[4:])
  142. target_data = target_data.reshape((-1, 1)) * el_dict['ADBitVolts']
  143. np.testing.assert_array_equal(target_data[:len(anasig)],
  144. anasig.magnitude)
  145. def test_read_nse_data(self):
  146. t_start, t_stop = None, None # in samples
  147. nio = NeuralynxIO(self.sn, use_cache='never')
  148. seg = Segment('testsegment')
  149. for el_id, el_dict in nio.parameters_nse.items():
  150. filepath = nio.parameters_nse[el_id]['recording_file_name']
  151. filename = filepath.split('/')[-1].split('\\')[-1].split('.')[0]
  152. nio.read_nse(filename, seg, t_start=t_start, t_stop=t_stop,
  153. waveforms=True)
  154. spiketrain = seg.filter({'electrode_id': el_id},
  155. objects=SpikeTrain)[0]
  156. # target_data = np.zeros((500, 32))
  157. # timestamps = np.zeros(500)
  158. entries = []
  159. with open(self.pd + '/%s.txt' % filename) as datafile:
  160. for i, line in enumerate(datafile):
  161. line = line.strip('\xef\xbb\xbf')
  162. entries.append(line.split())
  163. entries = np.asarray(entries, dtype=float)
  164. target_data = entries[:-1, 11:]
  165. timestamps = entries[:-1, 0]
  166. timestamps = (timestamps * pq.microsecond -
  167. nio.parameters_global['t_start'])
  168. np.testing.assert_array_equal(timestamps.magnitude,
  169. spiketrain.magnitude)
  170. np.testing.assert_array_equal(target_data,
  171. spiketrain.waveforms)
  172. def test_read_nev_data(self):
  173. t_start, t_stop = 0 * pq.s, 1000 * pq.s
  174. nio = NeuralynxIO(self.sn, use_cache='never')
  175. seg = Segment('testsegment')
  176. filename = 'Events'
  177. nio.read_nev(filename + '.nev', seg, t_start=t_start, t_stop=t_stop)
  178. timestamps = []
  179. nttls = []
  180. names = []
  181. event_ids = []
  182. with open(self.pd + '/%s.txt' % filename) as datafile:
  183. for i, line in enumerate(datafile):
  184. line = line.strip('\xef\xbb\xbf')
  185. entries = line.split('\t')
  186. nttls.append(int(entries[5]))
  187. timestamps.append(int(entries[3]))
  188. names.append(entries[10].rstrip('\r\n'))
  189. event_ids.append(int(entries[4]))
  190. timestamps = (np.array(timestamps) * pq.microsecond -
  191. nio.parameters_global['t_start'])
  192. # masking only requested spikes
  193. mask = np.where(timestamps < t_stop)[0]
  194. # return if no event fits criteria
  195. if len(mask) == 0:
  196. return
  197. timestamps = timestamps[mask]
  198. nttls = np.asarray(nttls)[mask]
  199. names = np.asarray(names)[mask]
  200. event_ids = np.asarray(event_ids)[mask]
  201. for i in range(len(timestamps)):
  202. events = seg.filter({'nttl': nttls[i]}, objects=Event)
  203. events = [e for e in events
  204. if (e.annotations['marker_id'] == event_ids[i] and
  205. e.labels == names[i])]
  206. self.assertTrue(len(events) == 1)
  207. self.assertTrue(timestamps[i] in events[0].times)
  208. def test_read_ntt_data(self):
  209. pass
  210. # TODO: Implement test_read_ntt_data once ntt files are available
  211. class TestCheetah_v574(TestCheetah_v551, CommonTests, unittest.TestCase):
  212. cheetah_version = '5.7.4'
  213. def test_read_block(self):
  214. """Read data in a certain time range into one block"""
  215. t_start, t_stop = 3 * pq.s, 4 * pq.s
  216. nio = NeuralynxIO(self.sn, use_cache='never')
  217. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop])
  218. self.assertEqual(len(nio.parameters_ncs), 5)
  219. self.assertTrue(
  220. {'event_id': 19, 'name': 'Starting Recording', 'nttl': 0} in
  221. nio.parameters_nev['Events.nev']['event_types'])
  222. self.assertTrue(
  223. {'event_id': 19, 'name': 'Stopping Recording', 'nttl': 0} in
  224. nio.parameters_nev['Events.nev']['event_types'])
  225. # Everything put in one segment
  226. self.assertEqual(len(block.segments), 1)
  227. seg = block.segments[0]
  228. self.assertEqual(len(seg.analogsignals), 1)
  229. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  230. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  231. pq.CompoundUnit('32*kHz'))
  232. self.assertAlmostEqual(seg.analogsignals[0].t_start, t_start, places=4)
  233. self.assertAlmostEqual(seg.analogsignals[0].t_stop, t_stop, places=4)
  234. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  235. # Testing different parameter combinations
  236. block = nio.read_block(lazy=True)
  237. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  238. block = nio.read_block(cascade=False)
  239. self.assertEqual(len(block.segments), 0)
  240. block = nio.read_block(electrode_list=[0])
  241. self.assertEqual(len(block.segments[0].analogsignals), 1)
  242. block = nio.read_block(t_starts=None, t_stops=None, events=True,
  243. waveforms=True)
  244. self.assertEqual(len(block.segments[0].analogsignals), 1)
  245. self.assertEqual(len(block.segments[0].spiketrains), 0)
  246. self.assertGreater(len(block.segments[0].events), 0)
  247. self.assertEqual(len(block.channel_indexes), 5)
  248. def test_read_segment(self):
  249. """Read data in a certain time range into one block"""
  250. nio = NeuralynxIO(self.sn, use_cache='never')
  251. seg = nio.read_segment(t_start=None, t_stop=None)
  252. self.assertEqual(len(seg.analogsignals), 1)
  253. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  254. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  255. pq.CompoundUnit('32*kHz'))
  256. self.assertEqual(len(seg.spiketrains), 0)
  257. # Testing different parameter combinations
  258. seg = nio.read_segment(lazy=True)
  259. self.assertEqual(len(seg.analogsignals[0]), 0)
  260. self.assertEqual(len(seg.spiketrains), 0)
  261. seg = nio.read_segment(cascade=False)
  262. self.assertEqual(len(seg.analogsignals), 0)
  263. self.assertEqual(len(seg.spiketrains), 0)
  264. seg = nio.read_segment(electrode_list=[0])
  265. self.assertEqual(len(seg.analogsignals), 1)
  266. seg = nio.read_segment(t_start=None, t_stop=None, events=True,
  267. waveforms=True)
  268. self.assertEqual(len(seg.analogsignals), 1)
  269. self.assertEqual(len(seg.spiketrains), 0)
  270. self.assertTrue(len(seg.events) > 0)
  271. class TestGaps(CommonTests, unittest.TestCase):
  272. cheetah_version = '5.5.1'
  273. def test_gap_handling(self):
  274. nio = NeuralynxIO(self.sn, use_cache='never')
  275. block = nio.read_block(t_starts=None, t_stops=None)
  276. # known gap values
  277. n_gaps = 1
  278. self.assertEqual(len(block.segments), n_gaps + 1)
  279. # one channel index for analogsignals for each of the 3 segments and
  280. # one for spiketrains
  281. self.assertEqual(len(block.channel_indexes), len(block.segments) + 1)
  282. self.assertEqual(len(block.channel_indexes[-1].units), 2)
  283. for unit in block.channel_indexes[-1].units:
  284. self.assertEqual(len(unit.spiketrains), n_gaps + 1)
  285. anasig_channels = [i for i in block.channel_indexes
  286. if 'analogsignal' in i.name]
  287. self.assertEqual(len(anasig_channels), n_gaps + 1)
  288. def test_gap_warning(self):
  289. nio = NeuralynxIO(self.sn, use_cache='never')
  290. with reset_warning_registry():
  291. with warnings.catch_warnings(record=True) as w:
  292. warnings.simplefilter('always')
  293. nio.read_block(t_starts=None, t_stops=None)
  294. self.assertGreater(len(w), 0)
  295. self.assertTrue(issubclass(w[0].category, UserWarning))
  296. self.assertEqual('Substituted t_starts and t_stops in order to'
  297. ' skip gap in recording session.',
  298. str(w[0].message))
  299. def test_analogsignal_shortening_warning(self):
  300. nio = NeuralynxIO(self.sn, use_cache='never')
  301. with reset_warning_registry():
  302. with warnings.catch_warnings(record=True) as w:
  303. seg = Segment('testsegment')
  304. nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)
  305. self.assertGreater(len(w), 0)
  306. self.assertTrue(issubclass(w[0].category, UserWarning))
  307. self.assertTrue('Analogsignalarray was shortened due to gap in'
  308. ' recorded data of file'
  309. in str(w[0].message))
  310. # This class is copied from
  311. # 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
  312. # and is related to http://bugs.python.org/issue21724 Python<3.4
  313. class reset_warning_registry(object):
  314. """
  315. context manager which archives & clears warning registry for duration of
  316. context.
  317. :param pattern:
  318. optional regex pattern, causes manager to only reset modules whose
  319. names match this pattern. defaults to ``".*"``.
  320. """
  321. #: regexp for filtering which modules are reset
  322. _pattern = None
  323. #: dict mapping module name -> old registry contents
  324. _backup = None
  325. def __init__(self, pattern=None):
  326. self._pattern = re.compile(pattern or ".*")
  327. def __enter__(self):
  328. # archive and clear the __warningregistry__ key for all modules
  329. # that match the 'reset' pattern.
  330. pattern = self._pattern
  331. backup = self._backup = {}
  332. for name, mod in list(sys.modules.items()):
  333. if pattern.match(name):
  334. reg = getattr(mod, "__warningregistry__", None)
  335. if reg:
  336. backup[name] = reg.copy()
  337. reg.clear()
  338. return self
  339. def __exit__(self, *exc_info):
  340. # restore warning registry from backup
  341. modules = sys.modules
  342. backup = self._backup
  343. for name, content in backup.items():
  344. mod = modules.get(name)
  345. if mod is None:
  346. continue
  347. reg = getattr(mod, "__warningregistry__", None)
  348. if reg is None:
  349. setattr(mod, "__warningregistry__", content)
  350. else:
  351. reg.clear()
  352. reg.update(content)
  353. # clear all registry entries that we didn't archive
  354. pattern = self._pattern
  355. for name, mod in list(modules.items()):
  356. if pattern.match(name) and name not in backup:
  357. reg = getattr(mod, "__warningregistry__", None)
  358. if reg:
  359. reg.clear()
  360. if __name__ == '__main__':
  361. unittest.main()