123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- # -*- coding: utf-8 -*-
- """
- Tests of neo.io.blackrockio
- """
- # needed for python 3 compatibility
- from __future__ import absolute_import
- import time
- import warnings
- import unittest
- import numpy as np
- import quantities as pq
- from neo.test.iotest.common_io_test import BaseTestIO
- from neo.core import *
- from neo.io.neuralynxio import NeuralynxIO
- from neo.io.neuralynxio import NeuralynxIO as NewNeuralynxIO
- from neo.io.neuralynxio_v1 import NeuralynxIO as OldNeuralynxIO
- from neo import AnalogSignal
- class CommonNeuralynxIOTest(BaseTestIO, unittest.TestCase, ):
- ioclass = NeuralynxIO
- files_to_test = [
- 'Cheetah_v5.5.1/original_data',
- 'Cheetah_v5.6.3/original_data',
- 'Cheetah_v5.7.4/original_data',
- ]
- files_to_download = [
- 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
- 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
- 'Cheetah_v5.5.1/original_data/Events.nev',
- 'Cheetah_v5.5.1/original_data/STet3a.nse',
- 'Cheetah_v5.5.1/original_data/STet3b.nse',
- 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
- 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
- 'Cheetah_v5.5.1/plain_data/STet3a.txt',
- 'Cheetah_v5.5.1/plain_data/STet3b.txt',
- 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
- 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
- 'Cheetah_v5.5.1/plain_data/Events.txt',
- 'Cheetah_v5.5.1/README.txt',
- 'Cheetah_v5.6.3/original_data/CheetahLogFile.txt',
- 'Cheetah_v5.6.3/original_data/CheetahLostADRecords.txt',
- 'Cheetah_v5.6.3/original_data/Events.nev',
- 'Cheetah_v5.6.3/original_data/CSC1.ncs',
- 'Cheetah_v5.6.3/original_data/CSC2.ncs',
- 'Cheetah_v5.6.3/original_data/TT1.ntt',
- 'Cheetah_v5.6.3/original_data/TT2.ntt',
- 'Cheetah_v5.6.3/original_data/VT1.nvt',
- 'Cheetah_v5.6.3/plain_data/Events.txt',
- 'Cheetah_v5.6.3/plain_data/CSC1.txt',
- 'Cheetah_v5.6.3/plain_data/CSC2.txt',
- 'Cheetah_v5.6.3/plain_data/TT1.txt',
- 'Cheetah_v5.6.3/plain_data/TT2.txt',
- 'Cheetah_v5.6.3/original_data/VT1.nvt',
- 'Cheetah_v5.7.4/original_data/CSC1.ncs',
- 'Cheetah_v5.7.4/original_data/CSC2.ncs',
- 'Cheetah_v5.7.4/original_data/CSC3.ncs',
- 'Cheetah_v5.7.4/original_data/CSC4.ncs',
- 'Cheetah_v5.7.4/original_data/CSC5.ncs',
- 'Cheetah_v5.7.4/original_data/Events.nev',
- 'Cheetah_v5.7.4/plain_data/CSC1.txt',
- 'Cheetah_v5.7.4/plain_data/CSC2.txt',
- 'Cheetah_v5.7.4/plain_data/CSC3.txt',
- 'Cheetah_v5.7.4/plain_data/CSC4.txt',
- 'Cheetah_v5.7.4/plain_data/CSC5.txt',
- 'Cheetah_v5.7.4/plain_data/Events.txt',
- 'Cheetah_v5.7.4/README.txt']
- class TestCheetah_v551(CommonNeuralynxIOTest, unittest.TestCase):
- cheetah_version = '5.5.1'
- files_to_test = []
- def test_read_block(self):
- """Read data in a certain time range into one block"""
- dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- # Everything put in one segment
- self.assertEqual(len(block.segments), 2)
- seg = block.segments[0]
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape[-1], 2)
- self.assertEqual(seg.analogsignals[0].sampling_rate, 32. * pq.kHz)
- self.assertEqual(len(seg.spiketrains), 2)
- # Testing different parameter combinations
- block = nio.read_block(lazy=True)
- self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
- self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
- block = nio.read_block(load_waveforms=True)
- self.assertEqual(len(block.segments[0].analogsignals), 1)
- self.assertEqual(len(block.segments[0].spiketrains), 2)
- self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
- block.segments[0].spiketrains[0].shape[0])
- self.assertGreater(len(block.segments[0].events), 0)
- self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2) # 2 segment
- block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
- self.assertEqual(len(block.channel_indexes[-1].units), 2) # 2 units
- block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
- self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
- def test_read_segment(self):
- dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- # read first segment entirely
- seg = nio.read_segment(seg_index=0, time_slice=None)
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape[-1], 2)
- self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
- self.assertEqual(len(seg.spiketrains), 2)
- # Testing different parameter combinations
- seg = nio.read_segment(seg_index=0, lazy=True)
- self.assertEqual(seg.analogsignals[0].size, 0)
- self.assertEqual(seg.spiketrains[0].size, 0)
- seg = nio.read_segment(seg_index=0, load_waveforms=True)
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(len(seg.spiketrains), 2)
- self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
- self.assertTrue(len(seg.events) > 0)
- class TestCheetah_v563(CommonNeuralynxIOTest, unittest.TestCase):
- cheetah_version = '5.6.3'
- files_to_test = []
- def test_read_block(self):
- """Read data in a certain time range into one block"""
- dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- # There are two segments due to gap in recording
- self.assertEqual(len(block.segments), 2)
- for seg in block.segments:
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape[-1], 2)
- self.assertEqual(seg.analogsignals[0].sampling_rate, 2. * pq.kHz)
- self.assertEqual(len(seg.spiketrains), 8)
- # Testing different parameter combinations
- block = nio.read_block(lazy=True)
- self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
- self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
- block = nio.read_block(load_waveforms=True)
- self.assertEqual(len(block.segments[0].analogsignals), 1)
- self.assertEqual(len(block.segments[0].spiketrains), 8)
- self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[0],
- block.segments[0].spiketrains[0].shape[0])
- # this is tetrode data, containing 32 samples per waveform
- self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[1], 4)
- self.assertEqual(block.segments[0].spiketrains[0].waveforms.shape[-1], 32)
- self.assertGreater(len(block.segments[0].events), 0)
- self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), 2)
- block = nio.read_block(load_waveforms=True, units_group_mode='all-in-one')
- self.assertEqual(len(block.channel_indexes[-1].units), 8)
- block = nio.read_block(load_waveforms=True, units_group_mode='split-all')
- self.assertEqual(len(block.channel_indexes[-1].units), 1) # 1 units by ChannelIndex
- def test_read_segment(self):
- dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- # read first segment entirely
- seg = nio.read_segment(seg_index=0, time_slice=None)
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape[-1], 2)
- self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
- self.assertEqual(len(seg.spiketrains), 2)
- # Testing different parameter combinations
- seg = nio.read_segment(seg_index=0, lazy=True)
- self.assertEqual(seg.analogsignals[0].size, 0)
- self.assertEqual(seg.spiketrains[0].size, 0)
- seg = nio.read_segment(seg_index=0, load_waveforms=True)
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(len(seg.spiketrains), 2)
- self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
- self.assertTrue(len(seg.events) > 0)
- class TestCheetah_v574(CommonNeuralynxIOTest, unittest.TestCase):
- cheetah_version = '5.7.4'
- files_to_test = []
- def test_read_block(self):
- dirname = self.get_filename_path('Cheetah_v5.7.4/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- # Everything put in one segment
- seg = block.segments[0]
- self.assertEqual(len(seg.analogsignals), 1)
- self.assertEqual(seg.analogsignals[0].shape[-1], 5)
- self.assertEqual(seg.analogsignals[0].sampling_rate, 32 * pq.kHz)
- self.assertEqual(len(seg.spiketrains), 0) # no nse files available
- # Testing different parameter combinations
- block = nio.read_block(lazy=True)
- self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
- block = nio.read_block(load_waveforms=True)
- self.assertEqual(len(block.segments[0].analogsignals), 1)
- self.assertEqual(len(block.segments[0].spiketrains), 0)
- self.assertGreater(len(block.segments[0].events), 0)
- block = nio.read_block(signal_group_mode='split-all')
- self.assertEqual(len(block.channel_indexes), 5)
- block = nio.read_block(signal_group_mode='group-by-same-units')
- self.assertEqual(len(block.channel_indexes), 1)
- class TestData(CommonNeuralynxIOTest, unittest.TestCase):
- def test_ncs(self):
- for session in self.files_to_test[1:2]: # in the long run this should include all files
- dirname = self.get_filename_path(session)
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- for anasig_id, anasig in enumerate(block.segments[0].analogsignals):
- chid = anasig.channel_index.annotations['channel_id'][anasig_id]
- filename = nio.ncs_filenames[chid][:-3] + 'txt'
- filename = filename.replace('original_data', 'plain_data')
- plain_data = np.loadtxt(filename)[:, 5:].flatten() # first columns are meta info
- overlap = 512 * 500
- gain_factor_0 = plain_data[0] / anasig.magnitude[0, 0]
- np.testing.assert_allclose(plain_data[:overlap],
- anasig.magnitude[:overlap, 0] * gain_factor_0,
- rtol=0.01)
- class TestGaps(CommonNeuralynxIOTest, unittest.TestCase):
- def test_gap_handling_v551(self):
- dirname = self.get_filename_path('Cheetah_v5.5.1/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- # known gap values
- n_gaps = 1
- # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
- self.assertEqual(len(block.segments), n_gaps + 1)
- self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
- self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
- def test_gap_handling_v563(self):
- dirname = self.get_filename_path('Cheetah_v5.6.3/original_data')
- nio = NeuralynxIO(dirname=dirname, use_cache=False)
- block = nio.read_block()
- # known gap values
- n_gaps = 1
- # so 2 segments, 2 anasigs by Channelindex, 2 SpikeTrain by Units
- self.assertEqual(len(block.segments), n_gaps + 1)
- self.assertEqual(len(block.channel_indexes[0].analogsignals), n_gaps + 1)
- self.assertEqual(len(block.channel_indexes[-1].units[0].spiketrains), n_gaps + 1)
- def compare_old_and_new_neuralynxio():
- base = '/tmp/files_for_testing_neo/neuralynx/'
- dirname = base + 'Cheetah_v5.5.1/original_data/'
- # ~ dirname = base+'Cheetah_v5.7.4/original_data/'
- t0 = time.perf_counter()
- newreader = NewNeuralynxIO(dirname)
- t1 = time.perf_counter()
- bl1 = newreader.read_block(load_waveforms=True)
- t2 = time.perf_counter()
- print('newreader header', t1 - t0, 's')
- print('newreader data', t2 - t1, 's')
- print('newreader toal', t2 - t0, 's')
- for seg in bl1.segments:
- print('seg', seg.index)
- for anasig in seg.analogsignals:
- print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
- for st in seg.spiketrains:
- print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
- for ev in seg.events:
- print(' Event', ev.name, ev.times.shape)
- print('*' * 10)
- t0 = time.perf_counter()
- oldreader = OldNeuralynxIO(sessiondir=dirname, use_cache='never')
- t1 = time.perf_counter()
- bl2 = oldreader.read_block(waveforms=True, events=True)
- t2 = time.perf_counter()
- print('oldreader header', t1 - t0, 's')
- print('oldreader data', t2 - t1, 's')
- print('oldreader toal', t2 - t0, 's')
- for seg in bl2.segments:
- print('seg', seg.index)
- for anasig in seg.analogsignals:
- print(' AnalogSignal', anasig.name, anasig.shape, anasig.t_start)
- for st in seg.spiketrains:
- print(' SpikeTrain', st.name, st.shape, st.waveforms.shape, st[:5])
- for ev in seg.events:
- print(' Event', ev.name, ev.times.shape)
- print('*' * 10)
- compare_neo_content(bl1, bl2)
- def compare_neo_content(bl1, bl2):
- print('*' * 5, 'Comparison of blocks', '*' * 5)
- object_types_to_test = [Segment, ChannelIndex, Unit, AnalogSignal,
- SpikeTrain, Event, Epoch]
- for objtype in object_types_to_test:
- print('Testing {}'.format(objtype))
- children1 = bl1.list_children_by_class(objtype)
- children2 = bl2.list_children_by_class(objtype)
- if len(children1) != len(children2):
- warnings.warn('Number of {} is different in both blocks ({} != {}).'
- ' Skipping comparison'
- ''.format(objtype, len(children1), len(children2)))
- continue
- for child1, child2 in zip(children1, children2):
- compare_annotations(child1.annotations, child2.annotations)
- compare_attributes(child1, child2)
- def compare_annotations(anno1, anno2):
- if len(anno1) != len(anno2):
- warnings.warn('Different numbers of annotations! {} != {}\nSkipping further comparison of '
- 'this annotation list.'.format(anno1.keys(), anno2.keys()))
- return
- assert anno1.keys() == anno2.keys()
- for key in anno1.keys():
- anno1[key] = anno2[key]
- def compare_attributes(child1, child2):
- assert child1._all_attrs == child2._all_attrs
- for attr_id in range(len(child1._all_attrs)):
- attr_name = child1._all_attrs[attr_id][0]
- attr_dtype = child1._all_attrs[attr_id][1]
- if type(child1) == AnalogSignal and attr_name == 'signal':
- continue
- if type(child1) == SpikeTrain and attr_name == 'times':
- continue
- unequal = child1.__getattribute__(attr_name) != child2.__getattribute__(attr_name)
- if hasattr(unequal, 'any'):
- unequal = unequal.any()
- if unequal:
- warnings.warn('Attributes differ! {}.{}={} is not equal to {}.{}={}'
- ''.format(child1.__class__.__name__, attr_name,
- child1.__getattribute__(attr_name),
- child2.__class__.__name__, attr_name,
- child2.__getattribute__(attr_name)))
- if __name__ == '__main__':
- unittest.main()
- # ~ compare_old_and_new_neuralynxio()
|