test_neuralynxio.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. """
  2. Tests of neo.io.neuralynxio.py
  3. """
  4. import time
  5. import warnings
  6. import unittest
  7. import numpy as np
  8. import quantities as pq
  9. from neo.test.iotest.common_io_test import BaseTestIO
  10. from neo.core import *
  11. from neo.io.neuralynxio import NeuralynxIO
  12. from neo.io.neuralynxio import NeuralynxIO as NewNeuralynxIO
  13. from neo.io.neuralynxio_v1 import NeuralynxIO as OldNeuralynxIO
  14. from neo import AnalogSignal
  15. class CommonNeuralynxIOTest(BaseTestIO, unittest.TestCase, ):
  16. ioclass = NeuralynxIO
  17. files_to_test = [
  18. # 'Cheetah_v4.0.2/original_data',
  19. 'Cheetah_v5.5.1/original_data',
  20. 'Cheetah_v5.6.3/original_data',
  21. 'Cheetah_v5.7.4/original_data',
  22. 'Pegasus_v2.1.1',
  23. 'Cheetah_v6.3.2/incomplete_blocks']
  24. files_to_download = [
  25. 'Cheetah_v4.0.2/original_data/CSC14_trunc.Ncs',
  26. 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
  27. 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
  28. 'Cheetah_v5.5.1/original_data/Events.nev',
  29. 'Cheetah_v5.5.1/original_data/STet3a.nse',
  30. 'Cheetah_v5.5.1/original_data/STet3b.nse',
  31. 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
  32. 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
  33. 'Cheetah_v5.5.1/plain_data/STet3a.txt',
  34. 'Cheetah_v5.5.1/plain_data/STet3b.txt',
  35. 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
  36. 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
  37. 'Cheetah_v5.5.1/plain_data/Events.txt',
  38. 'Cheetah_v5.5.1/README.txt',
  39. 'Cheetah_v5.6.3/original_data/CheetahLogFile.txt',
  40. 'Cheetah_v5.6.3/original_data/CheetahLostADRecords.txt',
  41. 'Cheetah_v5.6.3/original_data/Events.nev',
  42. 'Cheetah_v5.6.3/original_data/CSC1.ncs',
  43. 'Cheetah_v5.6.3/original_data/CSC2.ncs',
  44. 'Cheetah_v5.6.3/original_data/TT1.ntt',
  45. 'Cheetah_v5.6.3/original_data/TT2.ntt',
  46. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  47. 'Cheetah_v5.6.3/plain_data/Events.txt',
  48. 'Cheetah_v5.6.3/plain_data/CSC1.txt',
  49. 'Cheetah_v5.6.3/plain_data/CSC2.txt',
  50. 'Cheetah_v5.6.3/plain_data/TT1.txt',
  51. 'Cheetah_v5.6.3/plain_data/TT2.txt',
  52. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  53. 'Cheetah_v5.7.4/original_data/CSC1.ncs',
  54. 'Cheetah_v5.7.4/original_data/CSC2.ncs',
  55. 'Cheetah_v5.7.4/original_data/CSC3.ncs',
  56. 'Cheetah_v5.7.4/original_data/CSC4.ncs',
  57. 'Cheetah_v5.7.4/original_data/CSC5.ncs',
  58. 'Cheetah_v5.7.4/original_data/Events.nev',
  59. 'Cheetah_v5.7.4/plain_data/CSC1.txt',
  60. 'Cheetah_v5.7.4/plain_data/CSC2.txt',
  61. 'Cheetah_v5.7.4/plain_data/CSC3.txt',
  62. 'Cheetah_v5.7.4/plain_data/CSC4.txt',
  63. 'Cheetah_v5.7.4/plain_data/CSC5.txt',
  64. 'Cheetah_v5.7.4/plain_data/Events.txt',
  65. 'Cheetah_v5.7.4/README.txt',
  66. 'Pegasus_v2.1.1/Events_0008.nev',
  67. 'Cheetah_v6.3.2/incomplete_blocks/CSC1_reduced.ncs',
  68. 'Cheetah_v6.3.2/incomplete_blocks/Events.nev',
  69. 'Cheetah_v6.3.2/incomplete_blocks/README.txt']
  70. class TestCheetah_v551(CommonNeuralynxIOTest, unittest.TestCase):
  71. cheetah_version = '5.5.1'
  72. files_to_test = []
  73. def test_read_block(self):
  74. """Read data in a certain time range into one block"""
  75. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  76. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  77. block = nio.read_block()
  78. # Everything put in one segment
  79. self.assertEqual(len(block.segments), 2)
  80. seg = block.segments[0]
  81. self.assertEqual(len(seg.analogsignals), 1)
  82. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  83. self.assertEqual(seg.analogsignals[0].sampling_rate, 32. * pq.kHz)
  84. self.assertEqual(len(seg.spiketrains), 2)
  85. # Testing different parameter combinations
  86. block = nio.read_block(load_waveforms=True)
  87. self.assertEqual(len(block.segments[0].analogsignals), 1)
  88. self.assertEqual(len(block.segments[0].spiketrains), 2)
  89. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  90. block.segments[0].spiketrains[0].shape[0])
  91. self.assertGreater(len(block.segments[0].events), 0)
  92. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2) # 2 segment
  93. # block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  94. # self.assertEqual(len(block.channel_indexes[-1].units), 2) # 2 units
  95. # block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  96. # self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  97. def test_read_segment(self):
  98. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  99. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  100. # read first segment entirely
  101. seg = nio.read_segment(seg_index=0, time_slice=None)
  102. self.assertEqual(len(seg.analogsignals), 1)
  103. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  104. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  105. self.assertEqual(len(seg.spiketrains), 2)
  106. # Testing different parameter combinations
  107. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  108. self.assertEqual(len(seg.analogsignals), 1)
  109. self.assertEqual(len(seg.spiketrains), 2)
  110. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  111. self.assertTrue(len(seg.events) > 0)
  112. class TestCheetah_v563(CommonNeuralynxIOTest, unittest.TestCase):
  113. cheetah_version = '5.6.3'
  114. files_to_test = []
  115. def test_read_block(self):
  116. """Read data in a certain time range into one block"""
  117. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  118. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  119. block = nio.read_block()
  120. # There are two segments due to gap in recording
  121. self.assertEqual(len(block.segments), 2)
  122. for seg in block.segments:
  123. self.assertEqual(len(seg.analogsignals), 1)
  124. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  125. self.assertEqual(seg.analogsignals[0].sampling_rate, 2. * pq.kHz)
  126. self.assertEqual(len(seg.spiketrains), 8)
  127. # Testing different parameter combinations
  128. block = nio.read_block(load_waveforms=True)
  129. self.assertEqual(len(block.segments[0].analogsignals), 1)
  130. self.assertEqual(len(block.segments[0].spiketrains), 8)
  131. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  132. block.segments[0].spiketrains[0].shape[0])
  133. # this is tetrode data, containing 32 samples per waveform
  134. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[1], 4)
  135. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[-1], 32)
  136. self.assertGreater(len(block.segments[0].events), 0)
  137. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2)
  138. # block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  139. # self.assertEqual(len(block.channel_indexes[-1].units), 8)
  140. # block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  141. # self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  142. def test_read_segment(self):
  143. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  144. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  145. # read first segment entirely
  146. seg = nio.read_segment(seg_index=0, time_slice=None)
  147. self.assertEqual(len(seg.analogsignals), 1)
  148. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  149. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  150. self.assertEqual(len(seg.spiketrains), 2)
  151. # Testing different parameter combinations
  152. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  153. self.assertEqual(len(seg.analogsignals), 1)
  154. self.assertEqual(len(seg.spiketrains), 2)
  155. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  156. self.assertTrue(len(seg.events) > 0)
  157. class TestCheetah_v574(CommonNeuralynxIOTest, unittest.TestCase):
  158. cheetah_version = '5.7.4'
  159. files_to_test = []
  160. def test_read_block(self):
  161. dirname = self.get_filename_path('Cheetah_v5.7.4/original_data')
  162. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  163. block = nio.read_block()
  164. # Everything put in one segment
  165. seg = block.segments[0]
  166. self.assertEqual(len(seg.analogsignals), 1)
  167. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  168. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  169. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  170. # Testing different parameter combinations
  171. block = nio.read_block(load_waveforms=True)
  172. self.assertEqual(len(block.segments[0].analogsignals), 1)
  173. self.assertEqual(len(block.segments[0].spiketrains), 0)
  174. self.assertGreater(len(block.segments[0].events), 0)
  175. block = nio.read_block(signal_group_mode='split-all')
  176. self.assertEqual(len(block.groups), 5)
  177. block = nio.read_block(signal_group_mode='group-by-same-units')
  178. self.assertEqual(len(block.groups), 1)
  179. class TestPegasus_v211(CommonNeuralynxIOTest, unittest.TestCase):
  180. pegasus_version = '2.1.1'
  181. files_to_test = []
  182. def test_read_block(self):
  183. dirname = self.get_filename_path('Pegasus_v2.1.1')
  184. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  185. block = nio.read_block()
  186. # Everything put in one segment
  187. seg = block.segments[0]
  188. self.assertEqual(len(seg.analogsignals), 0) # no ncs file available
  189. self.assertGreater(len(block.segments[0].events), 1) # single nev file available
  190. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  191. # Testing different parameter combinations
  192. block = nio.read_block(load_waveforms=True)
  193. self.assertEqual(len(block.segments[0].spiketrains), 0)
  194. self.assertGreater(len(block.segments[0].events), 1)
  195. block = nio.read_block(signal_group_mode='split-all')
  196. self.assertEqual(len(block.channel_indexes), 0)
  197. block = nio.read_block(signal_group_mode='group-by-same-units')
  198. self.assertEqual(len(block.channel_indexes), 0)
  199. class TestData(CommonNeuralynxIOTest, unittest.TestCase):
  200. # def test_ncs(self):
  201. # for session in self.files_to_test[1:2]: # in the long run this should include all files
  202. # dirname = self.get_filename_path(session)
  203. # nio = NeuralynxIO(dirname=dirname, use_cache=False)
  204. # block = nio.read_block()
  205. # for anasig_id, anasig in enumerate(block.segments[0].analogsignals):
  206. # chid = anasig.channel_index.channel_ids[anasig_id]
  207. # # need to decode, unless keyerror
  208. # chname = anasig.channel_index.channel_names[anasig_id]
  209. # chuid = (chname, chid)
  210. # filename = nio.ncs_filenames[chuid][:-3] + 'txt'
  211. # filename = filename.replace('original_data', 'plain_data')
  212. # plain_data = np.loadtxt(filename)[:, 5:].flatten() # first columns are meta info
  213. # overlap = 512 * 500
  214. # gain_factor_0 = plain_data[0] / anasig.magnitude[0, 0]
  215. # np.testing.assert_allclose(plain_data[:overlap],
  216. # anasig.magnitude[:overlap, 0] * gain_factor_0,
  217. # rtol=0.01)
  218. def test_keep_original_spike_times(self):
  219. for session in self.files_to_test:
  220. dirname = self.get_filename_path(session)
  221. nio = NeuralynxIO(dirname=dirname, keep_original_times=True)
  222. block = nio.read_block()
  223. for st in block.segments[0].spiketrains:
  224. filename = st.file_origin.replace('original_data', 'plain_data')
  225. if '.nse' in st.file_origin:
  226. filename = filename.replace('.nse', '.txt')
  227. times_column = 0
  228. plain_data = np.loadtxt(filename)[:, times_column]
  229. elif '.ntt' in st.file_origin:
  230. filename = filename.replace('.ntt', '.txt')
  231. times_column = 2
  232. plain_data = np.loadtxt(filename)[:, times_column]
  233. # ntt files contain 4 rows per spike time
  234. plain_data = plain_data[::4]
  235. times = st.rescale(pq.microsecond).magnitude
  236. overlap = min(len(plain_data), len(times))
  237. np.testing.assert_allclose(plain_data[:overlap], times[:overlap], rtol=1e-10)
  238. class TestIncompleteBlocks(CommonNeuralynxIOTest, unittest.TestCase):
  239. def test_incomplete_block_handling_v632(self):
  240. dirname = self.get_filename_path('Cheetah_v6.3.2/incomplete_blocks')
  241. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  242. block = nio.read_block()
  243. # known gap values
  244. n_gaps = 2
  245. # so 3 segments, 3 anasigs by Channelindex
  246. self.assertEqual(len(block.segments), n_gaps + 1)
  247. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  248. for t, gt in zip(nio._sigs_t_start, [8408.806811, 8427.832053, 8487.768561]):
  249. self.assertEqual(np.round(t, 4), np.round(gt, 4))
  250. for t, gt in zip(nio._sigs_t_stop, [8427.830803, 8487.768029, 8515.816549]):
  251. self.assertEqual(np.round(t, 4), np.round(gt, 4))
  252. class TestGaps(CommonNeuralynxIOTest, unittest.TestCase):
  253. def test_gap_handling_v551(self):
  254. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  255. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  256. block = nio.read_block()
  257. # known gap values
  258. n_gaps = 1
  259. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  260. self.assertEqual(len(block.segments), n_gaps + 1)
  261. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  262. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  263. def test_gap_handling_v563(self):
  264. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  265. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  266. block = nio.read_block()
  267. # known gap values
  268. n_gaps = 1
  269. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  270. self.assertEqual(len(block.segments), n_gaps + 1)
  271. # self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  272. # self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  273. def compare_old_and_new_neuralynxio():
  274. base = '/tmp/files_for_testing_neo/neuralynx/'
  275. dirname = base + 'Cheetah_v5.5.1/original_data/'
  276. # ~ dirname = base+'Cheetah_v5.7.4/original_data/'
  277. t0 = time.perf_counter()
  278. newreader = NewNeuralynxIO(dirname)
  279. t1 = time.perf_counter()
  280. bl1 = newreader.read_block(load_waveforms=True)
  281. t2 = time.perf_counter()
  282. print('newreader header', t1 - t0, 's')
  283. print('newreader data', t2 - t1, 's')
  284. print('newreader toal', t2 - t0, 's')
  285. for seg in bl1.segments:
  286. print('seg', seg.index)
  287. for anasig in seg.analogsignals:
  288. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  289. for st in seg.spiketrains:
  290. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  291. for ev in seg.events:
  292. print(' Event', ev.name, ev.times.shape)
  293. print('*' * 10)
  294. t0 = time.perf_counter()
  295. oldreader = OldNeuralynxIO(sessiondir=dirname, use_cache='never')
  296. t1 = time.perf_counter()
  297. bl2 = oldreader.read_block(waveforms=True, events=True)
  298. t2 = time.perf_counter()
  299. print('oldreader header', t1 - t0, 's')
  300. print('oldreader data', t2 - t1, 's')
  301. print('oldreader toal', t2 - t0, 's')
  302. for seg in bl2.segments:
  303. print('seg', seg.index)
  304. for anasig in seg.analogsignals:
  305. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  306. for st in seg.spiketrains:
  307. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  308. for ev in seg.events:
  309. print(' Event', ev.name, ev.times.shape)
  310. print('*' * 10)
  311. compare_neo_content(bl1, bl2)
  312. def compare_neo_content(bl1, bl2):
  313. print('*' * 5, 'Comparison of blocks', '*' * 5)
  314. object_types_to_test = [Segment, ChannelIndex, Unit, AnalogSignal,
  315. SpikeTrain, Event, Epoch]
  316. for objtype in object_types_to_test:
  317. print('Testing {}'.format(objtype))
  318. children1 = bl1.list_children_by_class(objtype)
  319. children2 = bl2.list_children_by_class(objtype)
  320. if len(children1) != len(children2):
  321. warnings.warn('Number of {} is different in both blocks ({} != {}).'
  322. ' Skipping comparison'
  323. ''.format(objtype, len(children1), len(children2)))
  324. continue
  325. for child1, child2 in zip(children1, children2):
  326. compare_annotations(child1.annotations, child2.annotations)
  327. compare_attributes(child1, child2)
  328. def compare_annotations(anno1, anno2):
  329. if len(anno1) != len(anno2):
  330. warnings.warn('Different numbers of annotations! {} != {}\nSkipping further comparison of '
  331. 'this annotation list.'.format(anno1.keys(), anno2.keys()))
  332. return
  333. assert anno1.keys() == anno2.keys()
  334. for key in anno1.keys():
  335. anno1[key] = anno2[key]
  336. def compare_attributes(child1, child2):
  337. assert child1._all_attrs == child2._all_attrs
  338. for attr_id in range(len(child1._all_attrs)):
  339. attr_name = child1._all_attrs[attr_id][0]
  340. attr_dtype = child1._all_attrs[attr_id][1]
  341. if type(child1) == AnalogSignal and attr_name == 'signal':
  342. continue
  343. if type(child1) == SpikeTrain and attr_name == 'times':
  344. continue
  345. unequal = child1.__getattribute__(attr_name) != child2.__getattribute__(attr_name)
  346. if hasattr(unequal, 'any'):
  347. unequal = unequal.any()
  348. if unequal:
  349. warnings.warn('Attributes differ! {}.{}={} is not equal to {}.{}={}'
  350. ''.format(child1.__class__.__name__, attr_name,
  351. child1.__getattribute__(attr_name),
  352. child2.__class__.__name__, attr_name,
  353. child2.__getattribute__(attr_name)))
  354. if __name__ == '__main__':
  355. unittest.main()
  356. # ~ compare_old_and_new_neuralynxio()