123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- # -*- coding: utf-8 -*-
- '''
- Common tests for RawIOs:
- It is copy/paste from neo/test/iotests/common_io_test.py
- The code should be shared for common parts.
- The public URL is in url_for_tests.
- To deposite new testing files, please create a account at
- gin.g-node.org and upload files at NeuralEnsemble/ephy_testing_data
- data repo.
- '''
- # needed for python 3 compatibility
- from __future__ import unicode_literals, print_function, division, absolute_import
- __test__ = False
- # url_for_tests = "https://portal.g-node.org/neo/" #This is the old place
- url_for_tests = "https://web.gin.g-node.org/NeuralEnsemble/ephy_testing_data/raw/master/"
- import os
- import logging
- import unittest
- from neo.rawio.tests.tools import (can_use_network, make_all_directories,
- download_test_file, create_local_temp_dir)
- from neo.rawio.tests import rawio_compliance as compliance
- class BaseTestRawIO(object):
- '''
- This class make common tests for all IOs.
- Basically download files from G-node portal.
- And test the IO is working.
- '''
- # ~ __test__ = False
- # all IO test need to modify this:
- rawioclass = None # the IOclass to be tested
- files_to_test = [] # list of files to test compliances
- files_to_download = [] # when files are at G-Node
- # allow environment to tell avoid using network
- use_network = can_use_network()
- local_test_dir = None
- def setUp(self):
- '''
- Set up the test fixture. This is run for every test
- '''
- self.shortname = self.rawioclass.__name__.lower().replace('rawio', '')
- self.create_local_dir_if_not_exists()
- self.download_test_files_if_not_present()
- def create_local_dir_if_not_exists(self):
- '''
- Create a local directory to store testing files and return it.
- The directory path is also written to self.local_test_dir
- '''
- self.local_test_dir = create_local_temp_dir(self.shortname)
- return self.local_test_dir
- def download_test_files_if_not_present(self):
- '''
- Download %s file at G-node for testing
- url_for_tests is global at beginning of this file.
- ''' % self.rawioclass.__name__
- if not self.use_network:
- raise unittest.SkipTest("Requires download of data from the web")
- url = url_for_tests + self.shortname
- try:
- make_all_directories(self.files_to_download, self.local_test_dir)
- download_test_file(self.files_to_download,
- self.local_test_dir, url)
- except IOError as exc:
- raise unittest.SkipTest(exc)
- download_test_files_if_not_present.__test__ = False
- def cleanup_file(self, path):
- '''
- Remove test files or directories safely.
- '''
- cleanup_test_file(self.rawioclass, path, directory=self.local_test_dir)
- def get_filename_path(self, filename):
- '''
- Get the path to a filename in the current temporary file directory
- '''
- return os.path.join(self.local_test_dir, filename)
- def test_read_all(self):
- # Read all file in self.entities_to_test
- for entity_name in self.entities_to_test:
- entity_name = self.get_filename_path(entity_name)
- if self.rawioclass.rawmode.endswith('-file'):
- reader = self.rawioclass(filename=entity_name)
- elif self.rawioclass.rawmode.endswith('-dir'):
- reader = self.rawioclass(dirname=entity_name)
- txt = reader.__repr__()
- assert 'nb_block' not in txt, 'Before parser_header() nb_block should be NOT known'
- reader.parse_header()
- txt = reader.__repr__()
- assert 'nb_block' in txt, 'After parser_header() nb_block should be known'
- # ~ print(txt)
- #
- txt = reader._repr_annotations()
- # ~ reader.print_annotations()
- # ~ sigs = reader.get_analogsignal_chunk(block_index=0, seg_index=0,
- # ~ i_start=None, i_stop=None, channel_indexes=[1])
- # ~ import matplotlib.pyplot as plt
- # ~ fig, ax = plt.subplots()
- # ~ ax.plot(sigs[:, 0])
- # ~ plt.show()
- # ~ nb_unit = reader.unit_channels_count()
- # ~ for unit_index in range(nb_unit):
- # ~ wfs = reader.spike_raw_waveforms(block_index=0, seg_index=0,
- # ~ unit_index=unit_index)
- # ~ if wfs is not None:
- # ~ import matplotlib.pyplot as plt
- # ~ fig, ax = plt.subplots()
- # ~ ax.plot(wfs[:, 0, :50].T)
- # ~ plt.show()
- # lanch a series of test compliance
- compliance.header_is_total(reader)
- compliance.count_element(reader)
- compliance.read_analogsignals(reader)
- compliance.read_spike_times(reader)
- compliance.read_spike_waveforms(reader)
- compliance.read_events(reader)
- compliance.has_annotations(reader)
- # basic benchmark
- level = logging.getLogger().getEffectiveLevel()
- logging.getLogger().setLevel(logging.INFO)
- compliance.benchmark_speed_read_signals(reader)
- logging.getLogger().setLevel(level)
|