test_blackrockio.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. """
  2. Tests of neo.io.blackrockio
  3. """
  4. import unittest
  5. import warnings
  6. from numpy.testing import assert_equal
  7. import numpy as np
  8. import quantities as pq
  9. from neo.io.blackrockio import BlackrockIO
  10. from neo.test.iotest.common_io_test import BaseTestIO
  11. from neo.test.iotest.tools import get_test_file_full_path
  12. from neo.test.tools import assert_neo_object_is_compliant
  13. # check scipy
  14. try:
  15. from distutils import version
  16. import scipy.io
  17. import scipy.version
  18. except ImportError as err:
  19. HAVE_SCIPY = False
  20. SCIPY_ERR = err
  21. else:
  22. if version.LooseVersion(scipy.version.version) < '0.8':
  23. HAVE_SCIPY = False
  24. SCIPY_ERR = ImportError("your scipy version is too old to support " +
  25. "MatlabIO, you need at least 0.8. " +
  26. "You have %s" % scipy.version.version)
  27. else:
  28. HAVE_SCIPY = True
  29. SCIPY_ERR = None
  30. class CommonTests(BaseTestIO, unittest.TestCase):
  31. ioclass = BlackrockIO
  32. files_to_test = ['FileSpec2.3001',
  33. 'blackrock_2_1/l101210-001']
  34. files_to_download = [
  35. 'FileSpec2.3001.nev',
  36. 'FileSpec2.3001.ns5',
  37. 'FileSpec2.3001.ccf',
  38. 'FileSpec2.3001.mat',
  39. 'blackrock_2_1/l101210-001.mat',
  40. 'blackrock_2_1/l101210-001_nev-02_ns5.mat',
  41. 'blackrock_2_1/l101210-001.ns2',
  42. 'blackrock_2_1/l101210-001.ns5',
  43. 'blackrock_2_1/l101210-001.nev',
  44. 'blackrock_2_1/l101210-001-02.nev',
  45. 'segment/PauseCorrect/pause_correct.nev',
  46. 'segment/PauseCorrect/pause_correct.ns2',
  47. 'segment/PauseSpikesOutside/pause_spikes_outside_seg.nev',
  48. 'segment/ResetCorrect/reset.nev',
  49. 'segment/ResetCorrect/reset.ns2',
  50. 'segment/ResetFail/reset_fail.nev']
  51. ioclass = BlackrockIO
  52. def test_load_waveforms(self):
  53. filename = self.get_filename_path('FileSpec2.3001')
  54. reader = BlackrockIO(filename=filename, verbose=False)
  55. bl = reader.read_block(load_waveforms=True)
  56. assert_neo_object_is_compliant(bl)
  57. def test_inputs_V23(self):
  58. """
  59. Test various inputs to BlackrockIO.read_block with version 2.3 file
  60. to check for parsing errors.
  61. """
  62. filename = self.get_filename_path('FileSpec2.3001')
  63. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
  64. # Assert IOError is raised when no Blackrock files are available
  65. with self.assertRaises(IOError):
  66. reader2 = BlackrockIO(filename='nonexistent')
  67. # Load data to maximum extent, one None is not given as list
  68. block = reader.read_block(load_waveforms=False)
  69. lena = len(block.segments[0].analogsignals[0])
  70. numspa = len(block.segments[0].spiketrains[0])
  71. # Load data using a negative time and a time exceeding the end of the
  72. # recording
  73. too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
  74. buggy_slice = (-100 * pq.ms, too_large_tstop)
  75. # this is valid in read_segment
  76. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=False)
  77. # this raise an error
  78. with self.assertRaises(AssertionError):
  79. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=True)
  80. lenb = len(seg.analogsignals[0])
  81. numspb = len(seg.spiketrains[0])
  82. # Same length of analog signal?
  83. # Both should have read the complete data set!
  84. self.assertEqual(lena, lenb)
  85. # Same length of spike train?
  86. # Both should have read the complete data set!
  87. self.assertEqual(numspa, numspb)
  88. # test 4 Units
  89. block = reader.read_block(load_waveforms=True,
  90. signal_group_mode='split-all')
  91. self.assertEqual(len(block.segments[0].analogsignals), 10)
  92. #self.assertEqual(len(block.channel_indexes[-1].units), 4)
  93. #self.assertEqual(len(block.channel_indexes[-1].units),
  94. # len(block.segments[0].spiketrains))
  95. anasig = block.segments[0].analogsignals[0]
  96. self.assertIsNotNone(anasig.file_origin)
  97. def test_inputs_V21(self):
  98. """
  99. Test various inputs to BlackrockIO.read_block with version 2.3 file
  100. to check for parsing errors.
  101. """
  102. filename = self.get_filename_path('blackrock_2_1/l101210-001')
  103. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
  104. # Assert IOError is raised when no Blackrock files are available
  105. with self.assertRaises(IOError):
  106. reader2 = BlackrockIO(filename='nonexistent')
  107. # with self.assertRaises(IOError):
  108. # reader2 = BlackrockIO(filename=filename, nev_override='nonexistent')
  109. # Load data to maximum extent, one None is not given as list
  110. block = reader.read_block(load_waveforms=False, signal_group_mode='split-all')
  111. lena = len(block.segments[0].analogsignals[0])
  112. numspa = len(block.segments[0].spiketrains[0])
  113. # Load data using a negative time and a time exceeding the end of the
  114. # recording
  115. too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
  116. buggy_slice = (-100 * pq.ms, too_large_tstop)
  117. # This is valid in read_segment
  118. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=False)
  119. # this raise error
  120. with self.assertRaises(AssertionError):
  121. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=True)
  122. lenb = len(seg.analogsignals[0])
  123. numspb = len(seg.spiketrains[0])
  124. # Same length of analog signal?
  125. # Both should have read the complete data set!
  126. self.assertEqual(lena, lenb)
  127. # Same length of spike train?
  128. # Both should have read the complete data set!
  129. self.assertEqual(numspa, numspb)
  130. # test 4 Units
  131. block = reader.read_block(load_waveforms=True,
  132. signal_group_mode='split-all',
  133. )#units_group_mode='all-in-one')
  134. self.assertEqual(len(block.segments[0].analogsignals), 96)
  135. #self.assertEqual(len(block.channel_indexes[-1].units), 218)
  136. #self.assertEqual(len(block.channel_indexes[-1].units),
  137. # len(block.segments[0].spiketrains))
  138. anasig = block.segments[0].analogsignals[0]
  139. self.assertIsNotNone(anasig.file_origin)
  140. def test_load_muliple_nsx(self):
  141. """
  142. Test if multiple nsx signals can be loaded at the same time.
  143. """
  144. filename = self.get_filename_path('blackrock_2_1/l101210-001')
  145. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load='all')
  146. # number of different sampling rates corresponds to number of nsx signals, because
  147. # single nsx contains only signals of identical sampling rate
  148. block = reader.read_block(load_waveforms=False)
  149. sampling_rates = np.unique(
  150. [a.sampling_rate.rescale('Hz') for a in block.filter(objects='AnalogSignal')])
  151. self.assertEqual(len(sampling_rates), 2)
  152. segment = reader.read_segment()
  153. sampling_rates = np.unique(
  154. [a.sampling_rate.rescale('Hz') for a in segment.filter(objects='AnalogSignal')])
  155. self.assertEqual(len(sampling_rates), 2)
  156. # load only ns5
  157. reader = BlackrockIO(filename=filename, nsx_to_load=5)
  158. seg = reader.read_segment()
  159. self.assertEqual(len(seg.analogsignals), 1)
  160. self.assertEqual(seg.analogsignals[0].shape, (109224, 96))
  161. # load only ns2
  162. reader = BlackrockIO(filename=filename, nsx_to_load=2)
  163. seg = reader.read_segment()
  164. self.assertEqual(len(seg.analogsignals), 1)
  165. self.assertEqual(seg.analogsignals[0].shape, (3640, 6))
  166. # load only ns2
  167. reader = BlackrockIO(filename=filename, nsx_to_load=[2])
  168. seg = reader.read_segment()
  169. self.assertEqual(len(seg.analogsignals), 1)
  170. # load ns2 + ns5
  171. reader = BlackrockIO(filename=filename, nsx_to_load=[2, 5])
  172. seg = reader.read_segment()
  173. self.assertEqual(len(seg.analogsignals), 2)
  174. self.assertEqual(seg.analogsignals[0].shape, (3640, 6))
  175. self.assertEqual(seg.analogsignals[1].shape, (109224, 96))
  176. # load only ns5
  177. reader = BlackrockIO(filename=filename, nsx_to_load='max')
  178. seg = reader.read_segment()
  179. self.assertEqual(len(seg.analogsignals), 1)
  180. self.assertEqual(seg.analogsignals[0].shape, (109224, 96))
  181. @unittest.skipUnless(HAVE_SCIPY, "requires scipy")
  182. def test_compare_blackrockio_with_matlabloader_v21(self):
  183. """
  184. This test compares the output of BlackrockIO.read_block() with the
  185. output generated by a Matlab implementation of a Blackrock file reader
  186. provided by the company. The output for comparison is provided in a
  187. .mat file created by the script create_data_matlab_blackrock.m.
  188. The function tests LFPs, spike times, and digital events.
  189. """
  190. dirname = get_test_file_full_path(ioclass=BlackrockIO,
  191. filename='blackrock_2_1/l101210-001',
  192. directory=self.local_test_dir, clean=False)
  193. # First run with parameters for ns5, then run with correct parameters for ns2
  194. parameters = [('blackrock_2_1/l101210-001_nev-02_ns5.mat',
  195. {'nsx_to_load': 5, 'nev_override': '-'.join([dirname, '02'])}),
  196. ('blackrock_2_1/l101210-001.mat', {'nsx_to_load': 2})]
  197. for index, param in enumerate(parameters):
  198. # Load data from matlab generated files
  199. ml = scipy.io.loadmat(
  200. get_test_file_full_path(
  201. ioclass=BlackrockIO,
  202. filename=param[0],
  203. directory=self.local_test_dir, clean=False))
  204. lfp_ml = ml['lfp'] # (channel x time) LFP matrix
  205. ts_ml = ml['ts'] # spike time stamps
  206. elec_ml = ml['el'] # spike electrodes
  207. unit_ml = ml['un'] # spike unit IDs
  208. wf_ml = ml['wf'] # waveforms
  209. mts_ml = ml['mts'] # marker time stamps
  210. mid_ml = ml['mid'] # marker IDs
  211. # Load data from original data files using the Neo BlackrockIO
  212. session = BlackrockIO(
  213. dirname,
  214. verbose=False, **param[1])
  215. block = session.read_block(load_waveforms=True, signal_group_mode='split-all')
  216. # Check if analog data are equal
  217. self.assertGreater(len(block.groups), 0)
  218. for i, chidx_grp in enumerate(block.channel_indexes):
  219. # Break for ChannelIndexes for Units that don't contain any Analogsignals
  220. if len(chidx_grp.analogsignals) == 0 and len(chidx_grp.spiketrains) >= 1:
  221. break
  222. # Should only have one AnalogSignal per ChannelIndex-representing Group
  223. self.assertEqual(len(chidx_grp.analogsignals), 1)
  224. # Find out channel_id in order to compare correctly
  225. idx = chidx_grp.analogsignals[0].annotations['channel_id']
  226. # Get data of AnalogSignal without pq.units
  227. anasig = np.squeeze(chidx_grp.analogsignals[0].base[:].magnitude)
  228. # Test for equality of first nonzero values of AnalogSignal
  229. # and matlab file contents
  230. # If not equal test if hardcoded gain is responsible for this
  231. # See BlackrockRawIO ll. 1420 commit 77a645655605ae39eca2de3ee511f3b522f11bd7
  232. j = 0
  233. while anasig[j] == 0:
  234. j += 1
  235. if lfp_ml[i, j] != np.squeeze(chidx_grp.analogsignals[0].base[j].magnitude):
  236. anasig = anasig / 152.592547
  237. anasig = np.round(anasig).astype(int)
  238. # Special case because id 142 is not included in ns2 file
  239. if idx == 143:
  240. idx -= 1
  241. if idx > 128:
  242. idx = idx - 136
  243. assert_equal(anasig, lfp_ml[idx - 1, :])
  244. # Check if spikes are equal
  245. self.assertEqual(len(block.segments), 1)
  246. for st_i in block.segments[0].spiketrains:
  247. channelid = st_i.annotations['channel_id']
  248. unitid = st_i.annotations['unit_id']
  249. # Compare waveforms
  250. matlab_wf = wf_ml[np.nonzero(
  251. np.logical_and(elec_ml == channelid, unit_ml == unitid)), :][0]
  252. # Atleast_2d as correction for waveforms that are saved
  253. # in single dimension in SpikeTrain
  254. # because only one waveform is available
  255. assert_equal(np.atleast_2d(np.squeeze(st_i.waveforms).magnitude), matlab_wf)
  256. # Compare spike timestamps
  257. matlab_spikes = ts_ml[np.nonzero(
  258. np.logical_and(elec_ml == channelid, unit_ml == unitid))]
  259. # Going sure that unit is really seconds and not 1/30000 seconds
  260. if (not st_i.units == pq.CompoundUnit("1.0/{} * s".format(30000))) and \
  261. st_i.units == pq.s:
  262. st_i = np.round(st_i.base * 30000).astype(int)
  263. assert_equal(st_i, matlab_spikes)
  264. # Check if digital input port events are equal
  265. self.assertGreater(len(block.segments[0].events), 0)
  266. for ea_i in block.segments[0].events:
  267. if ea_i.name == 'digital_input_port':
  268. # Get all digital event IDs in this recording
  269. marker_ids = set(ea_i.labels)
  270. for marker_id in marker_ids:
  271. python_digievents = np.round(
  272. ea_i.times.base[ea_i.labels == marker_id] * 30000).astype(int)
  273. matlab_digievents = mts_ml[
  274. np.nonzero(mid_ml == int(marker_id))]
  275. assert_equal(python_digievents, matlab_digievents)
  276. # Note: analog input events are not yet supported
  277. def test_segment_detection_reset(self):
  278. """
  279. This test makes sure segments are detected correctly when reset was used during recording.
  280. """
  281. # Path to nev that will fail
  282. filename_nev_fail = self.get_filename_path('segment/ResetFail/reset_fail')
  283. # Path to nsX and nev that will NOT fail
  284. filename = self.get_filename_path('segment/ResetCorrect/reset')
  285. # Warning filter needs to be set to always before first occurrence of this warning
  286. warnings.simplefilter("always", UserWarning)
  287. # This fails, because in the nev there is no way to separate two segments
  288. with self.assertRaises(AssertionError):
  289. reader = BlackrockIO(filename=filename, nsx_to_load=2, nev_override=filename_nev_fail)
  290. # The correct file will issue a warning because a reset has occurred
  291. # and could be detected, but was not explicitly documented in the file
  292. with warnings.catch_warnings(record=True) as w:
  293. reader = BlackrockIO(filename=filename, nsx_to_load=2)
  294. self.assertGreaterEqual(len(w), 1)
  295. messages = [str(warning.message) for warning in w if warning.category == UserWarning]
  296. self.assertIn("Detected 1 undocumented segments within nev data after "
  297. "timestamps [5451].", messages)
  298. # Manually reset warning filter in order to not show too many warnings afterwards
  299. warnings.simplefilter("default")
  300. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  301. # 1 Segment at the beginning and 1 after reset
  302. self.assertEqual(len(block.segments), 2)
  303. # Checking all times are correct as read from file itself
  304. # (taking neo calculations into account)
  305. self.assertEqual(block.segments[0].t_start, 0.0)
  306. self.assertEqual(block.segments[0].t_stop, 4.02)
  307. # Clock is reset to 0
  308. self.assertEqual(block.segments[1].t_start, 0.0032)
  309. self.assertEqual(block.segments[1].t_stop, 3.9842)
  310. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  311. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.02)
  312. self.assertEqual(block.segments[1].analogsignals[0].t_start, 0.0032)
  313. self.assertEqual(block.segments[1].analogsignals[0].t_stop, 3.9842)
  314. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  315. self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.02)
  316. self.assertEqual(block.segments[1].spiketrains[0].t_start, 0.0032)
  317. self.assertEqual(block.segments[1].spiketrains[0].t_stop, 3.9842)
  318. # Each segment must have the same number of analogsignals
  319. self.assertEqual(len(block.segments[0].analogsignals),
  320. len(block.segments[1].analogsignals))
  321. # Length of analogsignals as created
  322. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4020)
  323. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 3981)
  324. def test_segment_detection_pause(self):
  325. """
  326. This test makes sure segments are detected correctly when pause was used during recording.
  327. """
  328. # Path to nev that has spikes that don't fit nsX segment
  329. filename_nev_outside_seg = self.get_filename_path(
  330. 'segment/PauseSpikesOutside/pause_spikes_outside_seg')
  331. # Path to nsX and nev that are correct
  332. filename = self.get_filename_path('segment/PauseCorrect/pause_correct')
  333. # This issues a warning, because there are spikes a long time after the last segment
  334. # And another one because there are spikes between segments
  335. with warnings.catch_warnings(record=True) as w:
  336. warnings.simplefilter("always")
  337. reader = BlackrockIO(filename=filename, nsx_to_load=2,
  338. nev_override=filename_nev_outside_seg)
  339. self.assertGreaterEqual(len(w), 2)
  340. # Check that warnings are correct
  341. messages = [str(warning.message) for warning in w if warning.category == UserWarning]
  342. self.assertIn('Spikes outside any segment. Detected on segment #1', messages)
  343. self.assertIn('Spikes 0.0776s after last segment.', messages)
  344. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  345. # 2 segments
  346. self.assertEqual(len(block.segments), 2)
  347. # Checking all times are correct as read from file itself
  348. # (taking neo calculations into account)
  349. self.assertEqual(block.segments[0].t_start, 0.0)
  350. # This value is so high, because a spike occurred right before the second segment
  351. # And thus is added to the first segment
  352. # This is not normal behavior and occurs because of the way the files were cut
  353. # into test files
  354. self.assertAlmostEqual(block.segments[0].t_stop.magnitude, 15.83916667)
  355. # Clock is not reset
  356. self.assertEqual(block.segments[1].t_start.magnitude, 31.0087)
  357. # Segment time is longer here as well because of spikes after second segment
  358. self.assertEqual(block.segments[1].t_stop.magnitude, 35.0863)
  359. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  360. # The AnalogSignal is only 4 seconds long, as opposed to the segment
  361. # whose length is caused by the additional spike
  362. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
  363. self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
  364. self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
  365. places=6)
  366. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  367. self.assertAlmostEqual(block.segments[0].spiketrains[0].t_stop.magnitude, 15.83916667,
  368. places=8)
  369. self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
  370. self.assertEqual(block.segments[1].spiketrains[0].t_stop, 35.0863)
  371. # Each segment has same number of analogsignals
  372. self.assertEqual(len(block.segments[0].analogsignals),
  373. len(block.segments[1].analogsignals))
  374. # Analogsignals have exactly 4000 samples
  375. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
  376. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
  377. # This case is correct, no spikes outside segment or anything
  378. reader = BlackrockIO(filename=filename, nsx_to_load=2)
  379. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  380. # 2 segments
  381. self.assertEqual(len(block.segments), 2)
  382. # Checking all times are correct as read from file itself
  383. # (taking neo calculations into account)
  384. self.assertEqual(block.segments[0].t_start, 0.0)
  385. # Now segment time is only 4 seconds, because there were no additional spikes
  386. self.assertEqual(block.segments[0].t_stop, 4.0)
  387. self.assertEqual(block.segments[1].t_start, 31.0087)
  388. self.assertAlmostEqual(block.segments[1].t_stop.magnitude, 35.0087, places=6)
  389. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  390. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
  391. self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
  392. self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
  393. places=6)
  394. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  395. self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.0)
  396. self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
  397. self.assertAlmostEqual(block.segments[1].spiketrains[0].t_stop.magnitude, 35.0087,
  398. places=6)
  399. # Each segment has same number of analogsignals
  400. self.assertEqual(len(block.segments[0].analogsignals),
  401. len(block.segments[1].analogsignals))
  402. # ns2 was created in such a way that all analogsignals have 4000 samples
  403. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
  404. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
  405. if __name__ == '__main__':
  406. unittest.main()