test_neuralynxio.py 18 KB


  1. """
  2. Tests of neo.io.neuralynxio.py
  3. """
  4. import time
  5. import warnings
  6. import unittest
  7. import numpy as np
  8. import quantities as pq
  9. from neo.test.iotest.common_io_test import BaseTestIO
  10. from neo.core import *
  11. from neo.io.neuralynxio import NeuralynxIO
  12. from neo.io.neuralynxio import NeuralynxIO as NewNeuralynxIO
  13. from neo.io.neuralynxio_v1 import NeuralynxIO as OldNeuralynxIO
  14. from neo import AnalogSignal
  15. class CommonNeuralynxIOTest(BaseTestIO, unittest.TestCase, ):
  16. ioclass = NeuralynxIO
  17. files_to_test = [
  18. # 'Cheetah_v4.0.2/original_data',
  19. 'Cheetah_v5.5.1/original_data',
  20. 'Cheetah_v5.6.3/original_data',
  21. 'Cheetah_v5.7.4/original_data',
  22. 'Pegasus_v2.1.1',
  23. 'Cheetah_v6.3.2/incomplete_blocks']
  24. files_to_download = [
  25. 'Cheetah_v4.0.2/original_data/CSC14_trunc.Ncs',
  26. 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
  27. 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
  28. 'Cheetah_v5.5.1/original_data/Events.nev',
  29. 'Cheetah_v5.5.1/original_data/STet3a.nse',
  30. 'Cheetah_v5.5.1/original_data/STet3b.nse',
  31. 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
  32. 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
  33. 'Cheetah_v5.5.1/plain_data/STet3a.txt',
  34. 'Cheetah_v5.5.1/plain_data/STet3b.txt',
  35. 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
  36. 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
  37. 'Cheetah_v5.5.1/plain_data/Events.txt',
  38. 'Cheetah_v5.5.1/README.txt',
  39. 'Cheetah_v5.6.3/original_data/CheetahLogFile.txt',
  40. 'Cheetah_v5.6.3/original_data/CheetahLostADRecords.txt',
  41. 'Cheetah_v5.6.3/original_data/Events.nev',
  42. 'Cheetah_v5.6.3/original_data/CSC1.ncs',
  43. 'Cheetah_v5.6.3/original_data/CSC2.ncs',
  44. 'Cheetah_v5.6.3/original_data/TT1.ntt',
  45. 'Cheetah_v5.6.3/original_data/TT2.ntt',
  46. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  47. 'Cheetah_v5.6.3/plain_data/Events.txt',
  48. 'Cheetah_v5.6.3/plain_data/CSC1.txt',
  49. 'Cheetah_v5.6.3/plain_data/CSC2.txt',
  50. 'Cheetah_v5.6.3/plain_data/TT1.txt',
  51. 'Cheetah_v5.6.3/plain_data/TT2.txt',
  52. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  53. 'Cheetah_v5.7.4/original_data/CSC1.ncs',
  54. 'Cheetah_v5.7.4/original_data/CSC2.ncs',
  55. 'Cheetah_v5.7.4/original_data/CSC3.ncs',
  56. 'Cheetah_v5.7.4/original_data/CSC4.ncs',
  57. 'Cheetah_v5.7.4/original_data/CSC5.ncs',
  58. 'Cheetah_v5.7.4/original_data/Events.nev',
  59. 'Cheetah_v5.7.4/plain_data/CSC1.txt',
  60. 'Cheetah_v5.7.4/plain_data/CSC2.txt',
  61. 'Cheetah_v5.7.4/plain_data/CSC3.txt',
  62. 'Cheetah_v5.7.4/plain_data/CSC4.txt',
  63. 'Cheetah_v5.7.4/plain_data/CSC5.txt',
  64. 'Cheetah_v5.7.4/plain_data/Events.txt',
  65. 'Cheetah_v5.7.4/README.txt',
  66. 'Pegasus_v2.1.1/Events_0008.nev',
  67. 'Cheetah_v6.3.2/incomplete_blocks/CSC1_reduced.ncs',
  68. 'Cheetah_v6.3.2/incomplete_blocks/Events.nev',
  69. 'Cheetah_v6.3.2/incomplete_blocks/README.txt']
  70. class TestCheetah_v551(CommonNeuralynxIOTest, unittest.TestCase):
  71. cheetah_version = '5.5.1'
  72. files_to_test = []
  73. def test_read_block(self):
  74. """Read data in a certain time range into one block"""
  75. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  76. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  77. block = nio.read_block()
  78. # Everything put in one segment
  79. self.assertEqual(len(block.segments), 2)
  80. seg = block.segments[0]
  81. self.assertEqual(len(seg.analogsignals), 1)
  82. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  83. self.assertEqual(seg.analogsignals[0].sampling_rate, 32. * pq.kHz)
  84. self.assertEqual(len(seg.spiketrains), 2)
  85. # Testing different parameter combinations
  86. block = nio.read_block(load_waveforms=True)
  87. self.assertEqual(len(block.segments[0].analogsignals), 1)
  88. self.assertEqual(len(block.segments[0].spiketrains), 2)
  89. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  90. block.segments[0].spiketrains[0].shape[0])
  91. self.assertGreater(len(block.segments[0].events), 0)
  92. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2) # 2 segment
  93. # block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  94. # self.assertEqual(len(block.channel_indexes[-1].units), 2) # 2 units
  95. # block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  96. # self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  97. def test_read_segment(self):
  98. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  99. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  100. # read first segment entirely
  101. seg = nio.read_segment(seg_index=0, time_slice=None)
  102. self.assertEqual(len(seg.analogsignals), 1)
  103. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  104. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  105. self.assertEqual(len(seg.spiketrains), 2)
  106. # Testing different parameter combinations
  107. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  108. self.assertEqual(len(seg.analogsignals), 1)
  109. self.assertEqual(len(seg.spiketrains), 2)
  110. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  111. self.assertTrue(len(seg.events) > 0)
  112. class TestCheetah_v563(CommonNeuralynxIOTest, unittest.TestCase):
  113. cheetah_version = '5.6.3'
  114. files_to_test = []
  115. def test_read_block(self):
  116. """Read data in a certain time range into one block"""
  117. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  118. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  119. block = nio.read_block()
  120. # There are two segments due to gap in recording
  121. self.assertEqual(len(block.segments), 2)
  122. for seg in block.segments:
  123. self.assertEqual(len(seg.analogsignals), 1)
  124. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  125. self.assertEqual(seg.analogsignals[0].sampling_rate, 2. * pq.kHz)
  126. self.assertEqual(len(seg.spiketrains), 8)
  127. # Testing different parameter combinations
  128. block = nio.read_block(load_waveforms=True)
  129. self.assertEqual(len(block.segments[0].analogsignals), 1)
  130. self.assertEqual(len(block.segments[0].spiketrains), 8)
  131. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  132. block.segments[0].spiketrains[0].shape[0])
  133. # this is tetrode data, containing 32 samples per waveform
  134. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[1], 4)
  135. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[-1], 32)
  136. self.assertGreater(len(block.segments[0].events), 0)
  137. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2)
  138. # block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  139. # self.assertEqual(len(block.channel_indexes[-1].units), 8)
  140. # block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  141. # self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  142. def test_read_segment(self):
  143. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  144. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  145. # read first segment entirely
  146. seg = nio.read_segment(seg_index=0, time_slice=None)
  147. self.assertEqual(len(seg.analogsignals), 1)
  148. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  149. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  150. self.assertEqual(len(seg.spiketrains), 2)
  151. # Testing different parameter combinations
  152. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  153. self.assertEqual(len(seg.analogsignals), 1)
  154. self.assertEqual(len(seg.spiketrains), 2)
  155. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  156. self.assertTrue(len(seg.events) > 0)
  157. class TestCheetah_v574(CommonNeuralynxIOTest, unittest.TestCase):
  158. cheetah_version = '5.7.4'
  159. files_to_test = []
  160. def test_read_block(self):
  161. dirname = self.get_filename_path('Cheetah_v5.7.4/original_data')
  162. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  163. block = nio.read_block()
  164. # Everything put in one segment
  165. seg = block.segments[0]
  166. self.assertEqual(len(seg.analogsignals), 1)
  167. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  168. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  169. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  170. # Testing different parameter combinations
  171. block = nio.read_block(load_waveforms=True)
  172. self.assertEqual(len(block.segments[0].analogsignals), 1)
  173. self.assertEqual(len(block.segments[0].spiketrains), 0)
  174. self.assertGreater(len(block.segments[0].events), 0)
  175. block = nio.read_block(signal_group_mode='split-all')
  176. self.assertEqual(len(block.groups), 5)
  177. block = nio.read_block(signal_group_mode='group-by-same-units')
  178. self.assertEqual(len(block.groups), 1)
  179. class TestPegasus_v211(CommonNeuralynxIOTest, unittest.TestCase):
  180. pegasus_version = '2.1.1'
  181. files_to_test = []
  182. def test_read_block(self):
  183. dirname = self.get_filename_path('Pegasus_v2.1.1')
  184. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  185. block = nio.read_block()
  186. # Everything put in one segment
  187. seg = block.segments[0]
  188. self.assertEqual(len(seg.analogsignals), 0) # no ncs file available
  189. self.assertGreater(len(block.segments[0].events), 1) # single nev file available
  190. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  191. # Testing different parameter combinations
  192. block = nio.read_block(load_waveforms=True)
  193. self.assertEqual(len(block.segments[0].spiketrains), 0)
  194. self.assertGreater(len(block.segments[0].events), 1)
  195. block = nio.read_block(signal_group_mode='split-all')
  196. self.assertEqual(len(block.channel_indexes), 0)
  197. block = nio.read_block(signal_group_mode='group-by-same-units')
  198. self.assertEqual(len(block.channel_indexes), 0)
  199. class TestData(CommonNeuralynxIOTest, unittest.TestCase):
  200. # def test_ncs(self):
  201. # for session in self.files_to_test[1:2]: # in the long run this should include all files
  202. # dirname = self.get_filename_path(session)
  203. # nio = NeuralynxIO(dirname=dirname, use_cache=False)
  204. # block = nio.read_block()
  205. # for anasig_id, anasig in enumerate(block.segments[0].analogsignals):
  206. # chid = anasig.channel_index.channel_ids[anasig_id]
  207. # # need to decode, unless keyerror
  208. # chname = anasig.channel_index.channel_names[anasig_id]
  209. # chuid = (chname, chid)
  210. # filename = nio.ncs_filenames[chuid][:-3] + 'txt'
  211. # filename = filename.replace('original_data', 'plain_data')
  212. # plain_data = np.loadtxt(filename)[:, 5:].flatten() # first columns are meta info
  213. # overlap = 512 * 500
  214. # gain_factor_0 = plain_data[0] / anasig.magnitude[0, 0]
  215. # np.testing.assert_allclose(plain_data[:overlap],
  216. # anasig.magnitude[:overlap, 0] * gain_factor_0,
  217. # rtol=0.01)
  218. def test_keep_original_spike_times(self):
  219. for session in self.files_to_test:
  220. dirname = self.get_filename_path(session)
  221. nio = NeuralynxIO(dirname=dirname, keep_original_times=True)
  222. block = nio.read_block()
  223. for st in block.segments[0].spiketrains:
  224. filename = st.file_origin.replace('original_data', 'plain_data')
  225. if '.nse' in st.file_origin:
  226. filename = filename.replace('.nse', '.txt')
  227. times_column = 0
  228. plain_data = np.loadtxt(filename)[:, times_column]
  229. elif '.ntt' in st.file_origin:
  230. filename = filename.replace('.ntt', '.txt')
  231. times_column = 2
  232. plain_data = np.loadtxt(filename)[:, times_column]
  233. # ntt files contain 4 rows per spike time
  234. plain_data = plain_data[::4]
  235. times = st.rescale(pq.microsecond).magnitude
  236. overlap = min(len(plain_data), len(times))
  237. np.testing.assert_allclose(plain_data[:overlap], times[:overlap], rtol=1e-10)
  238. class TestIncompleteBlocks(CommonNeuralynxIOTest, unittest.TestCase):
  239. def test_incomplete_block_handling_v632(self):
  240. dirname = self.get_filename_path('Cheetah_v6.3.2/incomplete_blocks')
  241. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  242. block = nio.read_block()
  243. # known gap values
  244. n_gaps = 2
  245. # so 3 segments, 3 anasigs by Channelindex
  246. self.assertEqual(len(block.segments), n_gaps + 1)
  247. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  248. for t, gt in zip(nio._sigs_t_start, [8408.806811, 8427.832053, 8487.768561]):
  249. self.assertEqual(np.round(t, 4), np.round(gt, 4))
  250. for t, gt in zip(nio._sigs_t_stop, [8427.830803, 8487.768029, 8515.816549]):
  251. self.assertEqual(np.round(t, 4), np.round(gt, 4))
  252. class TestGaps(CommonNeuralynxIOTest, unittest.TestCase):
  253. def test_gap_handling_v551(self):
  254. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  255. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  256. block = nio.read_block()
  257. # known gap values
  258. n_gaps = 1
  259. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  260. self.assertEqual(len(block.segments), n_gaps + 1)
  261. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  262. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  263. def test_gap_handling_v563(self):
  264. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  265. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  266. block = nio.read_block()
  267. # known gap values
  268. n_gaps = 1
  269. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  270. self.assertEqual(len(block.segments), n_gaps + 1)
  271. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  272. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  273. def compare_old_and_new_neuralynxio():
  274. base = '/tmp/files_for_testing_neo/neuralynx/'
  275. dirname = base + 'Cheetah_v5.5.1/original_data/'
  276. # ~ dirname = base+'Cheetah_v5.7.4/original_data/'
  277. t0 = time.perf_counter()
  278. newreader = NewNeuralynxIO(dirname)
  279. t1 = time.perf_counter()
  280. bl1 = newreader.read_block(load_waveforms=True)
  281. t2 = time.perf_counter()
  282. print('newreader header', t1 - t0, 's')
  283. print('newreader data', t2 - t1, 's')
  284. print('newreader toal', t2 - t0, 's')
  285. for seg in bl1.segments:
  286. print('seg', seg.index)
  287. for anasig in seg.analogsignals:
  288. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  289. for st in seg.spiketrains:
  290. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  291. for ev in seg.events:
  292. print(' Event', ev.name, ev.times.shape)
  293. print('*' * 10)
  294. t0 = time.perf_counter()
  295. oldreader = OldNeuralynxIO(sessiondir=dirname, use_cache='never')
  296. t1 = time.perf_counter()
  297. bl2 = oldreader.read_block(waveforms=True, events=True)
  298. t2 = time.perf_counter()
  299. print('oldreader header', t1 - t0, 's')
  300. print('oldreader data', t2 - t1, 's')
  301. print('oldreader toal', t2 - t0, 's')
  302. for seg in bl2.segments:
  303. print('seg', seg.index)
  304. for anasig in seg.analogsignals:
  305. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  306. for st in seg.spiketrains:
  307. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  308. for ev in seg.events:
  309. print(' Event', ev.name, ev.times.shape)
  310. print('*' * 10)
  311. compare_neo_content(bl1, bl2)
  312. def compare_neo_content(bl1, bl2):
  313. print('*' * 5, 'Comparison of blocks', '*' * 5)
  314. object_types_to_test = [Segment, ChannelIndex, Unit, AnalogSignal,
  315. SpikeTrain, Event, Epoch]
  316. for objtype in object_types_to_test:
  317. print('Testing {}'.format(objtype))
  318. children1 = bl1.list_children_by_class(objtype)
  319. children2 = bl2.list_children_by_class(objtype)
  320. if len(children1) != len(children2):
  321. warnings.warn('Number of {} is different in both blocks ({} != {}).'
  322. ' Skipping comparison'
  323. ''.format(objtype, len(children1), len(children2)))
  324. continue
  325. for child1, child2 in zip(children1, children2):
  326. compare_annotations(child1.annotations, child2.annotations)
  327. compare_attributes(child1, child2)
  328. def compare_annotations(anno1, anno2):
  329. if len(anno1) != len(anno2):
  330. warnings.warn('Different numbers of annotations! {} != {}\nSkipping further comparison of '
  331. 'this annotation list.'.format(anno1.keys(), anno2.keys()))
  332. return
  333. assert anno1.keys() == anno2.keys()
  334. for key in anno1.keys():
  335. anno1[key] = anno2[key]
  336. def compare_attributes(child1, child2):
  337. assert child1._all_attrs == child2._all_attrs
  338. for attr_id in range(len(child1._all_attrs)):
  339. attr_name = child1._all_attrs[attr_id][0]
  340. attr_dtype = child1._all_attrs[attr_id][1]
  341. if type(child1) == AnalogSignal and attr_name == 'signal':
  342. continue
  343. if type(child1) == SpikeTrain and attr_name == 'times':
  344. continue
  345. unequal = child1.__getattribute__(attr_name) != child2.__getattribute__(attr_name)
  346. if hasattr(unequal, 'any'):
  347. unequal = unequal.any()
  348. if unequal:
  349. warnings.warn('Attributes differ! {}.{}={} is not equal to {}.{}={}'
  350. ''.format(child1.__class__.__name__, attr_name,
  351. child1.__getattribute__(attr_name),
  352. child2.__class__.__name__, attr_name,
  353. child2.__getattribute__(attr_name)))
  354. if __name__ == '__main__':
  355. unittest.main()
  356. # ~ compare_old_and_new_neuralynxio()