test_neuralynxio.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.blackrockio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import time
  8. import warnings
  9. import unittest
  10. import numpy as np
  11. import quantities as pq
  12. from neo.test.iotest.common_io_test import BaseTestIO
  13. from neo.core import *
  14. from neo.io.neuralynxio import NeuralynxIO
  15. from neo.io.neuralynxio import NeuralynxIO as NewNeuralynxIO
  16. from neo.io.neuralynxio_v1 import NeuralynxIO as OldNeuralynxIO
  17. from neo import AnalogSignal
  18. class CommonNeuralynxIOTest(BaseTestIO, unittest.TestCase, ):
  19. ioclass = NeuralynxIO
  20. files_to_test = [
  21. 'Cheetah_v5.5.1/original_data',
  22. 'Cheetah_v5.6.3/original_data',
  23. 'Cheetah_v5.7.4/original_data',
  24. ]
  25. files_to_download = [
  26. 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
  27. 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
  28. 'Cheetah_v5.5.1/original_data/Events.nev',
  29. 'Cheetah_v5.5.1/original_data/STet3a.nse',
  30. 'Cheetah_v5.5.1/original_data/STet3b.nse',
  31. 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
  32. 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
  33. 'Cheetah_v5.5.1/plain_data/STet3a.txt',
  34. 'Cheetah_v5.5.1/plain_data/STet3b.txt',
  35. 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
  36. 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
  37. 'Cheetah_v5.5.1/plain_data/Events.txt',
  38. 'Cheetah_v5.5.1/README.txt',
  39. 'Cheetah_v5.6.3/original_data/CheetahLogFile.txt',
  40. 'Cheetah_v5.6.3/original_data/CheetahLostADRecords.txt',
  41. 'Cheetah_v5.6.3/original_data/Events.nev',
  42. 'Cheetah_v5.6.3/original_data/CSC1.ncs',
  43. 'Cheetah_v5.6.3/original_data/CSC2.ncs',
  44. 'Cheetah_v5.6.3/original_data/TT1.ntt',
  45. 'Cheetah_v5.6.3/original_data/TT2.ntt',
  46. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  47. 'Cheetah_v5.6.3/plain_data/Events.txt',
  48. 'Cheetah_v5.6.3/plain_data/CSC1.txt',
  49. 'Cheetah_v5.6.3/plain_data/CSC2.txt',
  50. 'Cheetah_v5.6.3/plain_data/TT1.txt',
  51. 'Cheetah_v5.6.3/plain_data/TT2.txt',
  52. 'Cheetah_v5.6.3/original_data/VT1.nvt',
  53. 'Cheetah_v5.7.4/original_data/CSC1.ncs',
  54. 'Cheetah_v5.7.4/original_data/CSC2.ncs',
  55. 'Cheetah_v5.7.4/original_data/CSC3.ncs',
  56. 'Cheetah_v5.7.4/original_data/CSC4.ncs',
  57. 'Cheetah_v5.7.4/original_data/CSC5.ncs',
  58. 'Cheetah_v5.7.4/original_data/Events.nev',
  59. 'Cheetah_v5.7.4/plain_data/CSC1.txt',
  60. 'Cheetah_v5.7.4/plain_data/CSC2.txt',
  61. 'Cheetah_v5.7.4/plain_data/CSC3.txt',
  62. 'Cheetah_v5.7.4/plain_data/CSC4.txt',
  63. 'Cheetah_v5.7.4/plain_data/CSC5.txt',
  64. 'Cheetah_v5.7.4/plain_data/Events.txt',
  65. 'Cheetah_v5.7.4/README.txt']
  66. class TestCheetah_v551(CommonNeuralynxIOTest, unittest.TestCase):
  67. cheetah_version = '5.5.1'
  68. files_to_test = []
  69. def test_read_block(self):
  70. """Read data in a certain time range into one block"""
  71. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  72. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  73. block = nio.read_block()
  74. # Everything put in one segment
  75. self.assertEqual(len(block.segments), 2)
  76. seg = block.segments[0]
  77. self.assertEqual(len(seg.analogsignals), 1)
  78. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  79. self.assertEqual(seg.analogsignals[0].sampling_rate, 32. * pq.kHz)
  80. self.assertEqual(len(seg.spiketrains), 2)
  81. # Testing different parameter combinations
  82. block = nio.read_block(lazy=True)
  83. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  84. self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
  85. block = nio.read_block(load_waveforms=True)
  86. self.assertEqual(len(block.segments[0].analogsignals), 1)
  87. self.assertEqual(len(block.segments[0].spiketrains), 2)
  88. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  89. block.segments[0].spiketrains[0].shape[0])
  90. self.assertGreater(len(block.segments[0].events), 0)
  91. self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2) # 2 segment
  92. block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  93. self.assertEqual(len(block.channel_indexes[-1].units), 2) # 2 units
  94. block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  95. self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  96. def test_read_segment(self):
  97. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  98. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  99. # read first segment entirely
  100. seg = nio.read_segment(seg_index=0, time_slice=None)
  101. self.assertEqual(len(seg.analogsignals), 1)
  102. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  103. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  104. self.assertEqual(len(seg.spiketrains), 2)
  105. # Testing different parameter combinations
  106. seg = nio.read_segment(seg_index=0, lazy=True)
  107. self.assertEqual(seg.analogsignals[0].size, 0)
  108. self.assertEqual(seg.spiketrains[0].size, 0)
  109. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  110. self.assertEqual(len(seg.analogsignals), 1)
  111. self.assertEqual(len(seg.spiketrains), 2)
  112. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  113. self.assertTrue(len(seg.events) > 0)
  114. class TestCheetah_v563(CommonNeuralynxIOTest, unittest.TestCase):
  115. cheetah_version = '5.6.3'
  116. files_to_test = []
  117. def test_read_block(self):
  118. """Read data in a certain time range into one block"""
  119. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  120. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  121. block = nio.read_block()
  122. # There are two segments due to gap in recording
  123. self.assertEqual(len(block.segments), 2)
  124. for seg in block.segments:
  125. self.assertEqual(len(seg.analogsignals), 1)
  126. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  127. self.assertEqual(seg.analogsignals[0].sampling_rate, 2. * pq.kHz)
  128. self.assertEqual(len(seg.spiketrains), 8)
  129. # Testing different parameter combinations
  130. block = nio.read_block(lazy=True)
  131. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  132. self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
  133. block = nio.read_block(load_waveforms=True)
  134. self.assertEqual(len(block.segments[0].analogsignals), 1)
  135. self.assertEqual(len(block.segments[0].spiketrains), 8)
  136. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
  137. block.segments[0].spiketrains[0].shape[0])
  138. # this is tetrode data, containing 32 samples per waveform
  139. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[1], 4)
  140. self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[-1], 32)
  141. self.assertGreater(len(block.segments[0].events), 0)
  142. self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2)
  143. block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
  144. self.assertEqual(len(block.channel_indexes[-1].units), 8)
  145. block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
  146. self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
  147. def test_read_segment(self):
  148. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  149. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  150. # read first segment entirely
  151. seg = nio.read_segment(seg_index=0, time_slice=None)
  152. self.assertEqual(len(seg.analogsignals), 1)
  153. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  154. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  155. self.assertEqual(len(seg.spiketrains), 2)
  156. # Testing different parameter combinations
  157. seg = nio.read_segment(seg_index=0, lazy=True)
  158. self.assertEqual(seg.analogsignals[0].size, 0)
  159. self.assertEqual(seg.spiketrains[0].size, 0)
  160. seg = nio.read_segment(seg_index=0, load_waveforms=True)
  161. self.assertEqual(len(seg.analogsignals), 1)
  162. self.assertEqual(len(seg.spiketrains), 2)
  163. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  164. self.assertTrue(len(seg.events) > 0)
  165. class TestCheetah_v574(CommonNeuralynxIOTest, unittest.TestCase):
  166. cheetah_version = '5.7.4'
  167. files_to_test = []
  168. def test_read_block(self):
  169. dirname = self.get_filename_path('Cheetah_v5.7.4/original_data')
  170. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  171. block = nio.read_block()
  172. # Everything put in one segment
  173. seg = block.segments[0]
  174. self.assertEqual(len(seg.analogsignals), 1)
  175. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  176. self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
  177. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  178. # Testing different parameter combinations
  179. block = nio.read_block(lazy=True)
  180. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  181. block = nio.read_block(load_waveforms=True)
  182. self.assertEqual(len(block.segments[0].analogsignals), 1)
  183. self.assertEqual(len(block.segments[0].spiketrains), 0)
  184. self.assertGreater(len(block.segments[0].events), 0)
  185. block = nio.read_block(signal_group_mode='split-all')
  186. self.assertEqual(len(block.channel_indexes), 5)
  187. block = nio.read_block(signal_group_mode='group-by-same-units')
  188. self.assertEqual(len(block.channel_indexes), 1)
  189. class TestData(CommonNeuralynxIOTest, unittest.TestCase):
  190. def test_ncs(self):
  191. for session in self.files_to_test[1:2]: # in the long run this should include all files
  192. dirname = self.get_filename_path(session)
  193. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  194. block = nio.read_block()
  195. for anasig_id, anasig in enumerate(block.segments[0].analogsignals):
  196. chid = anasig.channel_index.annotations['channel_id'][anasig_id]
  197. filename = nio.ncs_filenames[chid][:-3] + 'txt'
  198. filename = filename.replace('original_data', 'plain_data')
  199. plain_data = np.loadtxt(filename)[:, 5:].flatten() # first columns are meta info
  200. overlap = 512 * 500
  201. gain_factor_0 = plain_data[0] / anasig.magnitude[0, 0]
  202. np.testing.assert_allclose(plain_data[:overlap],
  203. anasig.magnitude[:overlap, 0] * gain_factor_0,
  204. rtol=0.01)
  205. class TestGaps(CommonNeuralynxIOTest, unittest.TestCase):
  206. def test_gap_handling_v551(self):
  207. dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
  208. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  209. block = nio.read_block()
  210. # known gap values
  211. n_gaps = 1
  212. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  213. self.assertEqual(len(block.segments), n_gaps + 1)
  214. self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  215. self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  216. def test_gap_handling_v563(self):
  217. dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
  218. nio = NeuralynxIO(dirname=dirname, use_cache=False)
  219. block = nio.read_block()
  220. # known gap values
  221. n_gaps = 1
  222. # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
  223. self.assertEqual(len(block.segments), n_gaps + 1)
  224. self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
  225. self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
  226. def compare_old_and_new_neuralynxio():
  227. base = '/tmp/files_for_testing_neo/neuralynx/'
  228. dirname = base + 'Cheetah_v5.5.1/original_data/'
  229. # ~ dirname = base+'Cheetah_v5.7.4/original_data/'
  230. t0 = time.perf_counter()
  231. newreader = NewNeuralynxIO(dirname)
  232. t1 = time.perf_counter()
  233. bl1 = newreader.read_block(load_waveforms=True)
  234. t2 = time.perf_counter()
  235. print('newreader header', t1 - t0, 's')
  236. print('newreader data', t2 - t1, 's')
  237. print('newreader toal', t2 - t0, 's')
  238. for seg in bl1.segments:
  239. print('seg', seg.index)
  240. for anasig in seg.analogsignals:
  241. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  242. for st in seg.spiketrains:
  243. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  244. for ev in seg.events:
  245. print(' Event', ev.name, ev.times.shape)
  246. print('*' * 10)
  247. t0 = time.perf_counter()
  248. oldreader = OldNeuralynxIO(sessiondir=dirname, use_cache='never')
  249. t1 = time.perf_counter()
  250. bl2 = oldreader.read_block(waveforms=True, events=True)
  251. t2 = time.perf_counter()
  252. print('oldreader header', t1 - t0, 's')
  253. print('oldreader data', t2 - t1, 's')
  254. print('oldreader toal', t2 - t0, 's')
  255. for seg in bl2.segments:
  256. print('seg', seg.index)
  257. for anasig in seg.analogsignals:
  258. print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
  259. for st in seg.spiketrains:
  260. print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
  261. for ev in seg.events:
  262. print(' Event', ev.name, ev.times.shape)
  263. print('*' * 10)
  264. compare_neo_content(bl1, bl2)
  265. def compare_neo_content(bl1, bl2):
  266. print('*' * 5, 'Comparison of blocks', '*' * 5)
  267. object_types_to_test = [Segment, ChannelIndex, Unit, AnalogSignal,
  268. SpikeTrain, Event, Epoch]
  269. for objtype in object_types_to_test:
  270. print('Testing {}'.format(objtype))
  271. children1 = bl1.list_children_by_class(objtype)
  272. children2 = bl2.list_children_by_class(objtype)
  273. if len(children1) != len(children2):
  274. warnings.warn('Number of {} is different in both blocks ({} != {}).'
  275. ' Skipping comparison'
  276. ''.format(objtype, len(children1), len(children2)))
  277. continue
  278. for child1, child2 in zip(children1, children2):
  279. compare_annotations(child1.annotations, child2.annotations)
  280. compare_attributes(child1, child2)
  281. def compare_annotations(anno1, anno2):
  282. if len(anno1) != len(anno2):
  283. warnings.warn('Different numbers of annotations! {} != {}\nSkipping further comparison of '
  284. 'this annotation list.'.format(anno1.keys(), anno2.keys()))
  285. return
  286. assert anno1.keys() == anno2.keys()
  287. for key in anno1.keys():
  288. anno1[key] = anno2[key]
  289. def compare_attributes(child1, child2):
  290. assert child1._all_attrs == child2._all_attrs
  291. for attr_id in range(len(child1._all_attrs)):
  292. attr_name = child1._all_attrs[attr_id][0]
  293. attr_dtype = child1._all_attrs[attr_id][1]
  294. if type(child1) == AnalogSignal and attr_name == 'signal':
  295. continue
  296. if type(child1) == SpikeTrain and attr_name == 'times':
  297. continue
  298. unequal = child1.__getattribute__(attr_name) != child2.__getattribute__(attr_name)
  299. if hasattr(unequal, 'any'):
  300. unequal = unequal.any()
  301. if unequal:
  302. warnings.warn('Attributes differ! {}.{}={} is not equal to {}.{}={}'
  303. ''.format(child1.__class__.__name__, attr_name,
  304. child1.__getattribute__(attr_name),
  305. child2.__class__.__name__, attr_name,
  306. child2.__getattribute__(attr_name)))
  307. if __name__ == '__main__':
  308. unittest.main()
  309. # ~ compare_old_and_new_neuralynxio()