test_blackrockio.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.blackrockio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import unittest
  8. import warnings
  9. from numpy.testing import assert_equal
  10. import numpy as np
  11. import quantities as pq
  12. from neo.io.blackrockio import BlackrockIO
  13. from neo.test.iotest.common_io_test import BaseTestIO
  14. from neo.test.iotest.tools import get_test_file_full_path
  15. from neo.test.tools import assert_neo_object_is_compliant
  16. # check scipy
  17. try:
  18. from distutils import version
  19. import scipy.io
  20. import scipy.version
  21. except ImportError as err:
  22. HAVE_SCIPY = False
  23. SCIPY_ERR = err
  24. else:
  25. if version.LooseVersion(scipy.version.version) < '0.8':
  26. HAVE_SCIPY = False
  27. SCIPY_ERR = ImportError("your scipy version is too old to support " +
  28. "MatlabIO, you need at least 0.8. " +
  29. "You have %s" % scipy.version.version)
  30. else:
  31. HAVE_SCIPY = True
  32. SCIPY_ERR = None
  33. class CommonTests(BaseTestIO, unittest.TestCase):
  34. ioclass = BlackrockIO
  35. files_to_test = ['FileSpec2.3001']
  36. files_to_download = [
  37. 'FileSpec2.3001.nev',
  38. 'FileSpec2.3001.ns5',
  39. 'FileSpec2.3001.ccf',
  40. 'FileSpec2.3001.mat',
  41. 'blackrock_2_1/l101210-001.mat',
  42. 'blackrock_2_1/l101210-001_nev-02_ns5.mat',
  43. 'blackrock_2_1/l101210-001.ns2',
  44. 'blackrock_2_1/l101210-001.ns5',
  45. 'blackrock_2_1/l101210-001.nev',
  46. 'blackrock_2_1/l101210-001-02.nev',
  47. 'segment/PauseCorrect/pause_correct.nev',
  48. 'segment/PauseCorrect/pause_correct.ns2',
  49. 'segment/PauseSpikesOutside/pause_spikes_outside_seg.nev',
  50. 'segment/ResetCorrect/reset.nev',
  51. 'segment/ResetCorrect/reset.ns2',
  52. 'segment/ResetFail/reset_fail.nev']
  53. ioclass = BlackrockIO
  54. def test_load_waveforms(self):
  55. filename = self.get_filename_path('FileSpec2.3001')
  56. reader = BlackrockIO(filename=filename, verbose=False)
  57. bl = reader.read_block(load_waveforms=True)
  58. assert_neo_object_is_compliant(bl)
  59. def test_inputs_V23(self):
  60. """
  61. Test various inputs to BlackrockIO.read_block with version 2.3 file
  62. to check for parsing errors.
  63. """
  64. filename = self.get_filename_path('FileSpec2.3001')
  65. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
  66. # Assert IOError is raised when no Blackrock files are available
  67. with self.assertRaises(IOError):
  68. reader2 = BlackrockIO(filename='nonexistent')
  69. # Load data to maximum extent, one None is not given as list
  70. block = reader.read_block(load_waveforms=False)
  71. lena = len(block.segments[0].analogsignals[0])
  72. numspa = len(block.segments[0].spiketrains[0])
  73. # Load data using a negative time and a time exceeding the end of the
  74. # recording
  75. too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
  76. buggy_slice = (-100 * pq.ms, too_large_tstop)
  77. # this is valid in read_segment because seg_index is specified
  78. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice)
  79. lenb = len(seg.analogsignals[0])
  80. numspb = len(seg.spiketrains[0])
  81. # Same length of analog signal?
  82. # Both should have read the complete data set!
  83. self.assertEqual(lena, lenb)
  84. # Same length of spike train?
  85. # Both should have read the complete data set!
  86. self.assertEqual(numspa, numspb)
  87. # test 4 Units
  88. block = reader.read_block(load_waveforms=True,
  89. units_group_mode='all-in-one')
  90. self.assertEqual(len(block.segments[0].analogsignals), 10)
  91. self.assertEqual(len(block.channel_indexes[-1].units), 4)
  92. self.assertEqual(len(block.channel_indexes[-1].units),
  93. len(block.segments[0].spiketrains))
  94. anasig = block.segments[0].analogsignals[0]
  95. self.assertIsNotNone(anasig.file_origin)
  96. def test_inputs_V21(self):
  97. """
  98. Test various inputs to BlackrockIO.read_block with version 2.3 file
  99. to check for parsing errors.
  100. """
  101. filename = self.get_filename_path('blackrock_2_1/l101210-001')
  102. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
  103. # Assert IOError is raised when no Blackrock files are available
  104. with self.assertRaises(IOError):
  105. reader2 = BlackrockIO(filename='nonexistent')
  106. # with self.assertRaises(IOError):
  107. # reader2 = BlackrockIO(filename=filename, nev_override='nonexistent')
  108. # Load data to maximum extent, one None is not given as list
  109. block = reader.read_block(load_waveforms=False)
  110. lena = len(block.segments[0].analogsignals[0])
  111. numspa = len(block.segments[0].spiketrains[0])
  112. # Load data using a negative time and a time exceeding the end of the
  113. # recording
  114. too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
  115. buggy_slice = (-100 * pq.ms, too_large_tstop)
  116. # This is valid in read_segment because seg_index is specified
  117. seg = reader.read_segment(seg_index=0, time_slice=buggy_slice)
  118. lenb = len(seg.analogsignals[0])
  119. numspb = len(seg.spiketrains[0])
  120. # Same length of analog signal?
  121. # Both should have read the complete data set!
  122. self.assertEqual(lena, lenb)
  123. # Same length of spike train?
  124. # Both should have read the complete data set!
  125. self.assertEqual(numspa, numspb)
  126. # test 4 Units
  127. block = reader.read_block(load_waveforms=True,
  128. units_group_mode='all-in-one')
  129. self.assertEqual(len(block.segments[0].analogsignals), 96)
  130. self.assertEqual(len(block.channel_indexes[-1].units), 218)
  131. self.assertEqual(len(block.channel_indexes[-1].units),
  132. len(block.segments[0].spiketrains))
  133. anasig = block.segments[0].analogsignals[0]
  134. self.assertIsNotNone(anasig.file_origin)
  135. def test_load_muliple_nsx(self):
  136. """
  137. Test if multiple nsx signals can be loaded at the same time.
  138. """
  139. filename = self.get_filename_path('blackrock_2_1/l101210-001')
  140. reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load='all')
  141. # number of different sampling rates corresponds to number of nsx signals, because
  142. # single nsx contains only signals of identical sampling rate
  143. block = reader.read_block(load_waveforms=False)
  144. sampling_rates = np.unique(
  145. [a.sampling_rate.rescale('Hz') for a in block.filter(objects='AnalogSignal')])
  146. self.assertEqual(len(sampling_rates), len(reader._selected_nsx))
  147. segment = reader.read_segment()
  148. sampling_rates = np.unique(
  149. [a.sampling_rate.rescale('Hz') for a in segment.filter(objects='AnalogSignal')])
  150. self.assertEqual(len(sampling_rates), len(reader._selected_nsx))
  151. @unittest.skipUnless(HAVE_SCIPY, "requires scipy")
  152. def test_compare_blackrockio_with_matlabloader_v21(self):
  153. """
  154. This test compares the output of BlackrockIO.read_block() with the
  155. output generated by a Matlab implementation of a Blackrock file reader
  156. provided by the company. The output for comparison is provided in a
  157. .mat file created by the script create_data_matlab_blackrock.m.
  158. The function tests LFPs, spike times, and digital events.
  159. """
  160. dirname = get_test_file_full_path(ioclass=BlackrockIO,
  161. filename='blackrock_2_1/l101210-001',
  162. directory=self.local_test_dir, clean=False)
  163. # First run with parameters for ns5, then run with correct parameters for ns2
  164. parameters = [('blackrock_2_1/l101210-001_nev-02_ns5.mat',
  165. {'nsx_to_load': 5, 'nev_override': '-'.join([dirname, '02'])}),
  166. ('blackrock_2_1/l101210-001.mat', {'nsx_to_load': 2})]
  167. for index, param in enumerate(parameters):
  168. # Load data from matlab generated files
  169. ml = scipy.io.loadmat(
  170. get_test_file_full_path(
  171. ioclass=BlackrockIO,
  172. filename=param[0],
  173. directory=self.local_test_dir, clean=False))
  174. lfp_ml = ml['lfp'] # (channel x time) LFP matrix
  175. ts_ml = ml['ts'] # spike time stamps
  176. elec_ml = ml['el'] # spike electrodes
  177. unit_ml = ml['un'] # spike unit IDs
  178. wf_ml = ml['wf'] # waveforms
  179. mts_ml = ml['mts'] # marker time stamps
  180. mid_ml = ml['mid'] # marker IDs
  181. # Load data from original data files using the Neo BlackrockIO
  182. session = BlackrockIO(
  183. dirname,
  184. verbose=False, **param[1])
  185. block = session.read_block(load_waveforms=True)
  186. # Check if analog data are equal
  187. self.assertGreater(len(block.channel_indexes), 0)
  188. for i, chidx in enumerate(block.channel_indexes):
  189. # Break for ChannelIndexes for Units that don't contain any Analogsignals
  190. if len(chidx.analogsignals) == 0 and len(chidx.units) >= 1:
  191. break
  192. # Should only have one AnalogSignal per ChannelIndex
  193. self.assertEqual(len(chidx.analogsignals), 1)
  194. # Find out channel_id in order to compare correctly
  195. idx = chidx.analogsignals[0].annotations['channel_id']
  196. # Get data of AnalogSignal without pq.units
  197. anasig = np.squeeze(chidx.analogsignals[0].base[:].magnitude)
  198. # Test for equality of first nonzero values of AnalogSignal
  199. # and matlab file contents
  200. # If not equal test if hardcoded gain is responsible for this
  201. # See BlackrockRawIO ll. 1420 commit 77a645655605ae39eca2de3ee511f3b522f11bd7
  202. j = 0
  203. while anasig[j] == 0:
  204. j += 1
  205. if lfp_ml[i, j] != np.squeeze(chidx.analogsignals[0].base[j].magnitude):
  206. anasig = anasig / 152.592547
  207. anasig = np.round(anasig).astype(int)
  208. # Special case because id 142 is not included in ns2 file
  209. if idx == 143:
  210. idx -= 1
  211. if idx > 128:
  212. idx = idx - 136
  213. assert_equal(anasig, lfp_ml[idx - 1, :])
  214. # Check if spikes are equal
  215. self.assertEqual(len(block.segments), 1)
  216. for st_i in block.segments[0].spiketrains:
  217. channelid = st_i.annotations['channel_id']
  218. unitid = st_i.annotations['unit_id']
  219. # Compare waveforms
  220. matlab_wf = wf_ml[np.nonzero(
  221. np.logical_and(elec_ml == channelid, unit_ml == unitid)), :][0]
  222. # Atleast_2d as correction for waveforms that are saved
  223. # in single dimension in SpikeTrain
  224. # because only one waveform is available
  225. assert_equal(np.atleast_2d(np.squeeze(st_i.waveforms).magnitude), matlab_wf)
  226. # Compare spike timestamps
  227. matlab_spikes = ts_ml[np.nonzero(
  228. np.logical_and(elec_ml == channelid, unit_ml == unitid))]
  229. # Going sure that unit is really seconds and not 1/30000 seconds
  230. if (not st_i.units == pq.CompoundUnit("1.0/{0} * s".format(30000))) and \
  231. st_i.units == pq.s:
  232. st_i = np.round(st_i.base * 30000).astype(int)
  233. assert_equal(st_i, matlab_spikes)
  234. # Check if digital input port events are equal
  235. self.assertGreater(len(block.segments[0].events), 0)
  236. for ea_i in block.segments[0].events:
  237. if ea_i.name == 'digital_input_port':
  238. # Get all digital event IDs in this recording
  239. marker_ids = set(ea_i.labels)
  240. for marker_id in marker_ids:
  241. python_digievents = np.round(
  242. ea_i.times.base[ea_i.labels == marker_id] * 30000).astype(int)
  243. matlab_digievents = mts_ml[
  244. np.nonzero(mid_ml == int(marker_id))]
  245. assert_equal(python_digievents, matlab_digievents)
  246. # Note: analog input events are not yet supported
  247. def test_segment_detection_reset(self):
  248. """
  249. This test makes sure segments are detected correctly when reset was used during recording.
  250. """
  251. # Path to nev that will fail
  252. filename_nev_fail = self.get_filename_path('segment/ResetFail/reset_fail')
  253. # Path to nsX and nev that will NOT fail
  254. filename = self.get_filename_path('segment/ResetCorrect/reset')
  255. # Warning filter needs to be set to always before first occurrence of this warning
  256. warnings.simplefilter("always", UserWarning)
  257. # This fails, because in the nev there is no way to separate two segments
  258. with self.assertRaises(AssertionError):
  259. reader = BlackrockIO(filename=filename, nsx_to_load=2, nev_override=filename_nev_fail)
  260. # The correct file will issue a warning because a reset has occurred
  261. # and could be detected, but was not explicitly documented in the file
  262. with warnings.catch_warnings(record=True) as w:
  263. reader = BlackrockIO(filename=filename, nsx_to_load=2)
  264. self.assertGreaterEqual(len(w), 1)
  265. messages = [str(warning.message) for warning in w if warning.category == UserWarning]
  266. self.assertIn("Detected 1 undocumented segments within nev data after "
  267. "timestamps [5451].", messages)
  268. # Manually reset warning filter in order to not show too many warnings afterwards
  269. warnings.simplefilter("default")
  270. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  271. # 1 Segment at the beginning and 1 after reset
  272. self.assertEqual(len(block.segments), 2)
  273. # Checking all times are correct as read from file itself
  274. # (taking neo calculations into account)
  275. self.assertEqual(block.segments[0].t_start, 0.0)
  276. self.assertEqual(block.segments[0].t_stop, 4.02)
  277. # Clock is reset to 0
  278. self.assertEqual(block.segments[1].t_start, 0.0032)
  279. self.assertEqual(block.segments[1].t_stop, 3.9842)
  280. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  281. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.02)
  282. self.assertEqual(block.segments[1].analogsignals[0].t_start, 0.0032)
  283. self.assertEqual(block.segments[1].analogsignals[0].t_stop, 3.9842)
  284. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  285. self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.02)
  286. self.assertEqual(block.segments[1].spiketrains[0].t_start, 0.0032)
  287. self.assertEqual(block.segments[1].spiketrains[0].t_stop, 3.9842)
  288. # Each segment must have the same number of analogsignals
  289. self.assertEqual(len(block.segments[0].analogsignals),
  290. len(block.segments[1].analogsignals))
  291. # Length of analogsignals as created
  292. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4020)
  293. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 3981)
  294. def test_segment_detection_pause(self):
  295. """
  296. This test makes sure segments are detected correctly when pause was used during recording.
  297. """
  298. # Path to nev that has spikes that don't fit nsX segment
  299. filename_nev_outside_seg = self.get_filename_path(
  300. 'segment/PauseSpikesOutside/pause_spikes_outside_seg')
  301. # Path to nsX and nev that are correct
  302. filename = self.get_filename_path('segment/PauseCorrect/pause_correct')
  303. # This issues a warning, because there are spikes a long time after the last segment
  304. # And another one because there are spikes between segments
  305. with warnings.catch_warnings(record=True) as w:
  306. warnings.simplefilter("always")
  307. reader = BlackrockIO(filename=filename, nsx_to_load=2,
  308. nev_override=filename_nev_outside_seg)
  309. self.assertGreaterEqual(len(w), 2)
  310. # Check that warnings are correct
  311. messages = [str(warning.message) for warning in w if warning.category == UserWarning]
  312. self.assertIn('Spikes outside any segment. Detected on segment #1', messages)
  313. self.assertIn('Spikes 0.0776s after last segment.', messages)
  314. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  315. # 2 segments
  316. self.assertEqual(len(block.segments), 2)
  317. # Checking all times are correct as read from file itself
  318. # (taking neo calculations into account)
  319. self.assertEqual(block.segments[0].t_start, 0.0)
  320. # This value is so high, because a spike occurred right before the second segment
  321. # And thus is added to the first segment
  322. # This is not normal behavior and occurs because of the way the files were cut
  323. # into test files
  324. self.assertAlmostEqual(block.segments[0].t_stop.magnitude, 15.83916667)
  325. # Clock is not reset
  326. self.assertEqual(block.segments[1].t_start.magnitude, 31.0087)
  327. # Segment time is longer here as well because of spikes after second segment
  328. self.assertEqual(block.segments[1].t_stop.magnitude, 35.0863)
  329. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  330. # The AnalogSignal is only 4 seconds long, as opposed to the segment
  331. # whose length is caused by the additional spike
  332. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
  333. self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
  334. self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
  335. places=6)
  336. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  337. self.assertAlmostEqual(block.segments[0].spiketrains[0].t_stop.magnitude, 15.83916667,
  338. places=8)
  339. self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
  340. self.assertEqual(block.segments[1].spiketrains[0].t_stop, 35.0863)
  341. # Each segment has same number of analogsignals
  342. self.assertEqual(len(block.segments[0].analogsignals),
  343. len(block.segments[1].analogsignals))
  344. # Analogsignals have exactly 4000 samples
  345. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
  346. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
  347. # This case is correct, no spikes outside segment or anything
  348. reader = BlackrockIO(filename=filename, nsx_to_load=2)
  349. block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
  350. # 2 segments
  351. self.assertEqual(len(block.segments), 2)
  352. # Checking all times are correct as read from file itself
  353. # (taking neo calculations into account)
  354. self.assertEqual(block.segments[0].t_start, 0.0)
  355. # Now segment time is only 4 seconds, because there were no additional spikes
  356. self.assertEqual(block.segments[0].t_stop, 4.0)
  357. self.assertEqual(block.segments[1].t_start, 31.0087)
  358. self.assertAlmostEqual(block.segments[1].t_stop.magnitude, 35.0087, places=6)
  359. self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
  360. self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
  361. self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
  362. self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
  363. places=6)
  364. self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
  365. self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.0)
  366. self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
  367. self.assertAlmostEqual(block.segments[1].spiketrains[0].t_stop.magnitude, 35.0087,
  368. places=6)
  369. # Each segment has same number of analogsignals
  370. self.assertEqual(len(block.segments[0].analogsignals),
  371. len(block.segments[1].analogsignals))
  372. # ns2 was created in such a way that all analogsignals have 4000 samples
  373. self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
  374. self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
  375. if __name__ == '__main__':
  376. unittest.main()