|
- """
- Tests of neo.io.blackrockio
- """
- import unittest
- import warnings
- from numpy.testing import assert_equal
- import numpy as np
- import quantities as pq
- from neo.io.blackrockio import BlackrockIO
- from neo.test.iotest.common_io_test import BaseTestIO
- from neo.test.iotest.tools import get_test_file_full_path
- from neo.test.tools import assert_neo_object_is_compliant
- # check scipy
- try:
- from distutils import version
- import scipy.io
- import scipy.version
- except ImportError as err:
- HAVE_SCIPY = False
- SCIPY_ERR = err
- else:
- if version.LooseVersion(scipy.version.version) < '0.8':
- HAVE_SCIPY = False
- SCIPY_ERR = ImportError("your scipy version is too old to support " +
- "MatlabIO, you need at least 0.8. " +
- "You have %s" % scipy.version.version)
- else:
- HAVE_SCIPY = True
- SCIPY_ERR = None
- class CommonTests(BaseTestIO, unittest.TestCase):
- ioclass = BlackrockIO
- files_to_test = ['FileSpec2.3001',
- 'blackrock_2_1/l101210-001']
- files_to_download = [
- 'FileSpec2.3001.nev',
- 'FileSpec2.3001.ns5',
- 'FileSpec2.3001.ccf',
- 'FileSpec2.3001.mat',
- 'blackrock_2_1/l101210-001.mat',
- 'blackrock_2_1/l101210-001_nev-02_ns5.mat',
- 'blackrock_2_1/l101210-001.ns2',
- 'blackrock_2_1/l101210-001.ns5',
- 'blackrock_2_1/l101210-001.nev',
- 'blackrock_2_1/l101210-001-02.nev',
- 'segment/PauseCorrect/pause_correct.nev',
- 'segment/PauseCorrect/pause_correct.ns2',
- 'segment/PauseSpikesOutside/pause_spikes_outside_seg.nev',
- 'segment/ResetCorrect/reset.nev',
- 'segment/ResetCorrect/reset.ns2',
- 'segment/ResetFail/reset_fail.nev']
- ioclass = BlackrockIO
- def test_load_waveforms(self):
- filename = self.get_filename_path('FileSpec2.3001')
- reader = BlackrockIO(filename=filename, verbose=False)
- bl = reader.read_block(load_waveforms=True)
- assert_neo_object_is_compliant(bl)
- def test_inputs_V23(self):
- """
- Test various inputs to BlackrockIO.read_block with version 2.3 file
- to check for parsing errors.
- """
- filename = self.get_filename_path('FileSpec2.3001')
- reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
- # Assert IOError is raised when no Blackrock files are available
- with self.assertRaises(IOError):
- reader2 = BlackrockIO(filename='nonexistent')
- # Load data to maximum extent, one None is not given as list
- block = reader.read_block(load_waveforms=False)
- lena = len(block.segments[0].analogsignals[0])
- numspa = len(block.segments[0].spiketrains[0])
- # Load data using a negative time and a time exceeding the end of the
- # recording
- too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
- buggy_slice = (-100 * pq.ms, too_large_tstop)
- # this is valid in read_segment
- seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=False)
- # this raise an error
- with self.assertRaises(AssertionError):
- seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=True)
- lenb = len(seg.analogsignals[0])
- numspb = len(seg.spiketrains[0])
- # Same length of analog signal?
- # Both should have read the complete data set!
- self.assertEqual(lena, lenb)
- # Same length of spike train?
- # Both should have read the complete data set!
- self.assertEqual(numspa, numspb)
- # test 4 Units
- block = reader.read_block(load_waveforms=True,
- signal_group_mode='split-all')
- self.assertEqual(len(block.segments[0].analogsignals), 10)
- #self.assertEqual(len(block.channel_indexes[-1].units), 4)
- #self.assertEqual(len(block.channel_indexes[-1].units),
- # len(block.segments[0].spiketrains))
- anasig = block.segments[0].analogsignals[0]
- self.assertIsNotNone(anasig.file_origin)
- def test_inputs_V21(self):
- """
- Test various inputs to BlackrockIO.read_block with version 2.3 file
- to check for parsing errors.
- """
- filename = self.get_filename_path('blackrock_2_1/l101210-001')
- reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load=5)
- # Assert IOError is raised when no Blackrock files are available
- with self.assertRaises(IOError):
- reader2 = BlackrockIO(filename='nonexistent')
- # with self.assertRaises(IOError):
- # reader2 = BlackrockIO(filename=filename, nev_override='nonexistent')
- # Load data to maximum extent, one None is not given as list
- block = reader.read_block(load_waveforms=False, signal_group_mode='split-all')
- lena = len(block.segments[0].analogsignals[0])
- numspa = len(block.segments[0].spiketrains[0])
- # Load data using a negative time and a time exceeding the end of the
- # recording
- too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
- buggy_slice = (-100 * pq.ms, too_large_tstop)
- # This is valid in read_segment
- seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=False)
- # this raise error
- with self.assertRaises(AssertionError):
- seg = reader.read_segment(seg_index=0, time_slice=buggy_slice, strict_slicing=True)
- lenb = len(seg.analogsignals[0])
- numspb = len(seg.spiketrains[0])
- # Same length of analog signal?
- # Both should have read the complete data set!
- self.assertEqual(lena, lenb)
- # Same length of spike train?
- # Both should have read the complete data set!
- self.assertEqual(numspa, numspb)
- # test 4 Units
- block = reader.read_block(load_waveforms=True,
- signal_group_mode='split-all',
- )#units_group_mode='all-in-one')
- self.assertEqual(len(block.segments[0].analogsignals), 96)
- #self.assertEqual(len(block.channel_indexes[-1].units), 218)
- #self.assertEqual(len(block.channel_indexes[-1].units),
- # len(block.segments[0].spiketrains))
- anasig = block.segments[0].analogsignals[0]
- self.assertIsNotNone(anasig.file_origin)
- def test_load_muliple_nsx(self):
- """
- Test if multiple nsx signals can be loaded at the same time.
- """
- filename = self.get_filename_path('blackrock_2_1/l101210-001')
- reader = BlackrockIO(filename=filename, verbose=False, nsx_to_load='all')
- # number of different sampling rates corresponds to number of nsx signals, because
- # single nsx contains only signals of identical sampling rate
- block = reader.read_block(load_waveforms=False)
- sampling_rates = np.unique(
- [a.sampling_rate.rescale('Hz') for a in block.filter(objects='AnalogSignal')])
- self.assertEqual(len(sampling_rates), 2)
- segment = reader.read_segment()
- sampling_rates = np.unique(
- [a.sampling_rate.rescale('Hz') for a in segment.filter(objects='AnalogSignal')])
- self.assertEqual(len(sampling_rates), 2)
- # load only ns5
- reader = BlackrockIO(filename=filename, nsx_to_load=5)
- seg = reader.read_segment()
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape, (109224, 96))
- # load only ns2
- reader = BlackrockIO(filename=filename, nsx_to_load=2)
- seg = reader.read_segment()
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape, (3640, 6))
- # load only ns2
- reader = BlackrockIO(filename=filename, nsx_to_load=[2])
- seg = reader.read_segment()
- self.assertEqual(len(seg.analogsignals), 1)
- # load ns2 + ns5
- reader = BlackrockIO(filename=filename, nsx_to_load=[2, 5])
- seg = reader.read_segment()
- self.assertEqual(len(seg.analogsignals), 2)
- self.assertEqual(seg.analogsignals[0].shape, (3640, 6))
- self.assertEqual(seg.analogsignals[1].shape, (109224, 96))
- # load only ns5
- reader = BlackrockIO(filename=filename, nsx_to_load='max')
- seg = reader.read_segment()
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape, (109224, 96))
- @unittest.skipUnless(HAVE_SCIPY, "requires scipy")
- def test_compare_blackrockio_with_matlabloader_v21(self):
- """
- This test compares the output of BlackrockIO.read_block() with the
- output generated by a Matlab implementation of a Blackrock file reader
- provided by the company. The output for comparison is provided in a
- .mat file created by the script create_data_matlab_blackrock.m.
- The function tests LFPs, spike times, and digital events.
- """
- dirname = get_test_file_full_path(ioclass=BlackrockIO,
- filename='blackrock_2_1/l101210-001',
- directory=self.local_test_dir, clean=False)
- # First run with parameters for ns5, then run with correct parameters for ns2
- parameters = [('blackrock_2_1/l101210-001_nev-02_ns5.mat',
- {'nsx_to_load': 5, 'nev_override': '-'.join([dirname, '02'])}),
- ('blackrock_2_1/l101210-001.mat', {'nsx_to_load': 2})]
- for index, param in enumerate(parameters):
- # Load data from matlab generated files
- ml = scipy.io.loadmat(
- get_test_file_full_path(
- ioclass=BlackrockIO,
- filename=param[0],
- directory=self.local_test_dir, clean=False))
- lfp_ml = ml['lfp'] # (channel x time) LFP matrix
- ts_ml = ml['ts'] # spike time stamps
- elec_ml = ml['el'] # spike electrodes
- unit_ml = ml['un'] # spike unit IDs
- wf_ml = ml['wf'] # waveforms
- mts_ml = ml['mts'] # marker time stamps
- mid_ml = ml['mid'] # marker IDs
- # Load data from original data files using the Neo BlackrockIO
- session = BlackrockIO(
- dirname,
- verbose=False, **param[1])
- block = session.read_block(load_waveforms=True, signal_group_mode='split-all')
- # Check if analog data are equal
- self.assertGreater(len(block.groups), 0)
- for i, chidx_grp in enumerate(block.channel_indexes):
- # Break for ChannelIndexes for Units that don't contain any Analogsignals
- if len(chidx_grp.analogsignals) == 0 and len(chidx_grp.spiketrains) >= 1:
- break
- # Should only have one AnalogSignal per ChannelIndex-representing Group
- self.assertEqual(len(chidx_grp.analogsignals), 1)
- # Find out channel_id in order to compare correctly
- idx = chidx_grp.analogsignals[0].annotations['channel_id']
- # Get data of AnalogSignal without pq.units
- anasig = np.squeeze(chidx_grp.analogsignals[0].base[:].magnitude)
- # Test for equality of first nonzero values of AnalogSignal
- # and matlab file contents
- # If not equal test if hardcoded gain is responsible for this
- # See BlackrockRawIO ll. 1420 commit 77a645655605ae39eca2de3ee511f3b522f11bd7
- j = 0
- while anasig[j] == 0:
- j += 1
- if lfp_ml[i, j] != np.squeeze(chidx_grp.analogsignals[0].base[j].magnitude):
- anasig = anasig / 152.592547
- anasig = np.round(anasig).astype(int)
- # Special case because id 142 is not included in ns2 file
- if idx == 143:
- idx -= 1
- if idx > 128:
- idx = idx - 136
- assert_equal(anasig, lfp_ml[idx - 1, :])
- # Check if spikes are equal
- self.assertEqual(len(block.segments), 1)
- for st_i in block.segments[0].spiketrains:
- channelid = st_i.annotations['channel_id']
- unitid = st_i.annotations['unit_id']
- # Compare waveforms
- matlab_wf = wf_ml[np.nonzero(
- np.logical_and(elec_ml == channelid, unit_ml == unitid)), :][0]
- # Atleast_2d as correction for waveforms that are saved
- # in single dimension in SpikeTrain
- # because only one waveform is available
- assert_equal(np.atleast_2d(np.squeeze(st_i.waveforms).magnitude), matlab_wf)
- # Compare spike timestamps
- matlab_spikes = ts_ml[np.nonzero(
- np.logical_and(elec_ml == channelid, unit_ml == unitid))]
- # Going sure that unit is really seconds and not 1/30000 seconds
- if (not st_i.units == pq.CompoundUnit("1.0/{} * s".format(30000))) and \
- st_i.units == pq.s:
- st_i = np.round(st_i.base * 30000).astype(int)
- assert_equal(st_i, matlab_spikes)
- # Check if digital input port events are equal
- self.assertGreater(len(block.segments[0].events), 0)
- for ea_i in block.segments[0].events:
- if ea_i.name == 'digital_input_port':
- # Get all digital event IDs in this recording
- marker_ids = set(ea_i.labels)
- for marker_id in marker_ids:
- python_digievents = np.round(
- ea_i.times.base[ea_i.labels == marker_id] * 30000).astype(int)
- matlab_digievents = mts_ml[
- np.nonzero(mid_ml == int(marker_id))]
- assert_equal(python_digievents, matlab_digievents)
- # Note: analog input events are not yet supported
- def test_segment_detection_reset(self):
- """
- This test makes sure segments are detected correctly when reset was used during recording.
- """
- # Path to nev that will fail
- filename_nev_fail = self.get_filename_path('segment/ResetFail/reset_fail')
- # Path to nsX and nev that will NOT fail
- filename = self.get_filename_path('segment/ResetCorrect/reset')
- # Warning filter needs to be set to always before first occurrence of this warning
- warnings.simplefilter("always", UserWarning)
- # This fails, because in the nev there is no way to separate two segments
- with self.assertRaises(AssertionError):
- reader = BlackrockIO(filename=filename, nsx_to_load=2, nev_override=filename_nev_fail)
- # The correct file will issue a warning because a reset has occurred
- # and could be detected, but was not explicitly documented in the file
- with warnings.catch_warnings(record=True) as w:
- reader = BlackrockIO(filename=filename, nsx_to_load=2)
- self.assertGreaterEqual(len(w), 1)
- messages = [str(warning.message) for warning in w if warning.category == UserWarning]
- self.assertIn("Detected 1 undocumented segments within nev data after "
- "timestamps [5451].", messages)
- # Manually reset warning filter in order to not show too many warnings afterwards
- warnings.simplefilter("default")
- block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
- # 1 Segment at the beginning and 1 after reset
- self.assertEqual(len(block.segments), 2)
- # Checking all times are correct as read from file itself
- # (taking neo calculations into account)
- self.assertEqual(block.segments[0].t_start, 0.0)
- self.assertEqual(block.segments[0].t_stop, 4.02)
- # Clock is reset to 0
- self.assertEqual(block.segments[1].t_start, 0.0032)
- self.assertEqual(block.segments[1].t_stop, 3.9842)
- self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
- self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.02)
- self.assertEqual(block.segments[1].analogsignals[0].t_start, 0.0032)
- self.assertEqual(block.segments[1].analogsignals[0].t_stop, 3.9842)
- self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
- self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.02)
- self.assertEqual(block.segments[1].spiketrains[0].t_start, 0.0032)
- self.assertEqual(block.segments[1].spiketrains[0].t_stop, 3.9842)
- # Each segment must have the same number of analogsignals
- self.assertEqual(len(block.segments[0].analogsignals),
- len(block.segments[1].analogsignals))
- # Length of analogsignals as created
- self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4020)
- self.assertEqual(len(block.segments[1].analogsignals[0][:]), 3981)
- def test_segment_detection_pause(self):
- """
- This test makes sure segments are detected correctly when pause was used during recording.
- """
- # Path to nev that has spikes that don't fit nsX segment
- filename_nev_outside_seg = self.get_filename_path(
- 'segment/PauseSpikesOutside/pause_spikes_outside_seg')
- # Path to nsX and nev that are correct
- filename = self.get_filename_path('segment/PauseCorrect/pause_correct')
- # This issues a warning, because there are spikes a long time after the last segment
- # And another one because there are spikes between segments
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- reader = BlackrockIO(filename=filename, nsx_to_load=2,
- nev_override=filename_nev_outside_seg)
- self.assertGreaterEqual(len(w), 2)
- # Check that warnings are correct
- messages = [str(warning.message) for warning in w if warning.category == UserWarning]
- self.assertIn('Spikes outside any segment. Detected on segment #1', messages)
- self.assertIn('Spikes 0.0776s after last segment.', messages)
- block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
- # 2 segments
- self.assertEqual(len(block.segments), 2)
- # Checking all times are correct as read from file itself
- # (taking neo calculations into account)
- self.assertEqual(block.segments[0].t_start, 0.0)
- # This value is so high, because a spike occurred right before the second segment
- # And thus is added to the first segment
- # This is not normal behavior and occurs because of the way the files were cut
- # into test files
- self.assertAlmostEqual(block.segments[0].t_stop.magnitude, 15.83916667)
- # Clock is not reset
- self.assertEqual(block.segments[1].t_start.magnitude, 31.0087)
- # Segment time is longer here as well because of spikes after second segment
- self.assertEqual(block.segments[1].t_stop.magnitude, 35.0863)
- self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
- # The AnalogSignal is only 4 seconds long, as opposed to the segment
- # whose length is caused by the additional spike
- self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
- self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
- self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
- places=6)
- self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
- self.assertAlmostEqual(block.segments[0].spiketrains[0].t_stop.magnitude, 15.83916667,
- places=8)
- self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
- self.assertEqual(block.segments[1].spiketrains[0].t_stop, 35.0863)
- # Each segment has same number of analogsignals
- self.assertEqual(len(block.segments[0].analogsignals),
- len(block.segments[1].analogsignals))
- # Analogsignals have exactly 4000 samples
- self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
- self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
- # This case is correct, no spikes outside segment or anything
- reader = BlackrockIO(filename=filename, nsx_to_load=2)
- block = reader.read_block(load_waveforms=False, signal_group_mode="split-all")
- # 2 segments
- self.assertEqual(len(block.segments), 2)
- # Checking all times are correct as read from file itself
- # (taking neo calculations into account)
- self.assertEqual(block.segments[0].t_start, 0.0)
- # Now segment time is only 4 seconds, because there were no additional spikes
- self.assertEqual(block.segments[0].t_stop, 4.0)
- self.assertEqual(block.segments[1].t_start, 31.0087)
- self.assertAlmostEqual(block.segments[1].t_stop.magnitude, 35.0087, places=6)
- self.assertEqual(block.segments[0].analogsignals[0].t_start, 0.0)
- self.assertEqual(block.segments[0].analogsignals[0].t_stop, 4.0)
- self.assertEqual(block.segments[1].analogsignals[0].t_start, 31.0087)
- self.assertAlmostEqual(block.segments[1].analogsignals[0].t_stop.magnitude, 35.0087,
- places=6)
- self.assertEqual(block.segments[0].spiketrains[0].t_start, 0.0)
- self.assertEqual(block.segments[0].spiketrains[0].t_stop, 4.0)
- self.assertEqual(block.segments[1].spiketrains[0].t_start, 31.0087)
- self.assertAlmostEqual(block.segments[1].spiketrains[0].t_stop.magnitude, 35.0087,
- places=6)
- # Each segment has same number of analogsignals
- self.assertEqual(len(block.segments[0].analogsignals),
- len(block.segments[1].analogsignals))
- # ns2 was created in such a way that all analogsignals have 4000 samples
- self.assertEqual(len(block.segments[0].analogsignals[0][:]), 4000)
- self.assertEqual(len(block.segments[1].analogsignals[0][:]), 4000)
- if __name__ == '__main__':
- unittest.main()
|