Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

test_brainwaredamio.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.brainwaredamio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import, division, print_function
  7. import os.path
  8. import sys
  9. try:
  10. import unittest2 as unittest
  11. except ImportError:
  12. import unittest
  13. import numpy as np
  14. import quantities as pq
  15. from neo.core import (AnalogSignal, Block,
  16. ChannelIndex, Segment)
  17. from neo.io import BrainwareDamIO
  18. from neo.test.iotest.common_io_test import BaseTestIO
  19. from neo.test.tools import (assert_same_sub_schema,
  20. assert_neo_object_is_compliant)
  21. from neo.test.iotest.tools import create_generic_reader
  22. PY_VER = sys.version_info[0]
  23. def proc_dam(filename):
  24. '''Load an dam file that has already been processed by the official matlab
  25. file converter. That matlab data is saved to an m-file, which is then
  26. converted to a numpy '.npz' file. This numpy file is the file actually
  27. loaded. This function converts it to a neo block and returns the block.
  28. This block can be compared to the block produced by BrainwareDamIO to
  29. make sure BrainwareDamIO is working properly
  30. block = proc_dam(filename)
  31. filename: The file name of the numpy file to load. It should end with
  32. '*_dam_py?.npz'. This will be converted to a neo 'file_origin' property
  33. with the value '*.dam', so the filename to compare should fit that pattern.
  34. 'py?' should be 'py2' for the python 2 version of the numpy file or 'py3'
  35. for the python 3 version of the numpy file.
  36. example: filename = 'file1_dam_py2.npz'
  37. dam file name = 'file1.dam'
  38. '''
  39. with np.load(filename) as damobj:
  40. damfile = damobj.items()[0][1].flatten()
  41. filename = os.path.basename(filename[:-12]+'.dam')
  42. signals = [res.flatten() for res in damfile['signal']]
  43. stimIndexes = [int(res[0, 0].tolist()) for res in damfile['stimIndex']]
  44. timestamps = [res[0, 0] for res in damfile['timestamp']]
  45. block = Block(file_origin=filename)
  46. chx = ChannelIndex(file_origin=filename,
  47. index=np.array([0]),
  48. channel_ids=np.array([1]),
  49. channel_names=np.array(['Chan1'], dtype='S'))
  50. block.channel_indexes.append(chx)
  51. params = [res['params'][0, 0].flatten() for res in damfile['stim']]
  52. values = [res['values'][0, 0].flatten() for res in damfile['stim']]
  53. params = [[res1[0] for res1 in res] for res in params]
  54. values = [[res1 for res1 in res] for res in values]
  55. stims = [dict(zip(param, value)) for param, value in zip(params, values)]
  56. fulldam = zip(stimIndexes, timestamps, signals, stims)
  57. for stimIndex, timestamp, signal, stim in fulldam:
  58. sig = AnalogSignal(signal=signal*pq.mV,
  59. t_start=timestamp*pq.d,
  60. file_origin=filename,
  61. sampling_period=1.*pq.s)
  62. segment = Segment(file_origin=filename,
  63. index=stimIndex,
  64. **stim)
  65. segment.analogsignals = [sig]
  66. block.segments.append(segment)
  67. block.create_many_to_one_relationship()
  68. return block
  69. class BrainwareDamIOTestCase(BaseTestIO, unittest.TestCase):
  70. '''
  71. Unit test testcase for neo.io.BrainwareDamIO
  72. '''
  73. ioclass = BrainwareDamIO
  74. read_and_write_is_bijective = False
  75. # These are the files it tries to read and test for compliance
  76. files_to_test = ['block_300ms_4rep_1clust_part_ch1.dam',
  77. 'interleaved_500ms_5rep_ch2.dam',
  78. 'long_170s_1rep_1clust_ch2.dam',
  79. 'multi_500ms_mulitrep_ch1.dam',
  80. 'random_500ms_12rep_noclust_part_ch2.dam',
  81. 'sequence_500ms_5rep_ch2.dam']
  82. # these are reference files to compare to
  83. files_to_compare = ['block_300ms_4rep_1clust_part_ch1',
  84. 'interleaved_500ms_5rep_ch2',
  85. '',
  86. 'multi_500ms_mulitrep_ch1',
  87. 'random_500ms_12rep_noclust_part_ch2',
  88. 'sequence_500ms_5rep_ch2']
  89. # add the appropriate suffix depending on the python version
  90. for i, fname in enumerate(files_to_compare):
  91. if fname:
  92. files_to_compare[i] += '_dam_py%s.npz' % PY_VER
  93. # Will fetch from g-node if they don't already exist locally
  94. # How does it know to do this before any of the other tests?
  95. files_to_download = files_to_test + files_to_compare
  96. def test_reading_same(self):
  97. for ioobj, path in self.iter_io_objects(return_path=True):
  98. obj_reader_base = create_generic_reader(ioobj, target=False)
  99. obj_reader_single = create_generic_reader(ioobj)
  100. obj_base = obj_reader_base()
  101. obj_single = obj_reader_single()
  102. try:
  103. assert_same_sub_schema(obj_base, obj_single)
  104. except BaseException as exc:
  105. exc.args += ('from ' + os.path.basename(path),)
  106. raise
  107. def test_against_reference(self):
  108. for filename, refname in zip(self.files_to_test,
  109. self.files_to_compare):
  110. if not refname:
  111. continue
  112. obj = self.read_file(filename=filename)
  113. refobj = proc_dam(self.get_filename_path(refname))
  114. try:
  115. assert_neo_object_is_compliant(obj)
  116. assert_neo_object_is_compliant(refobj)
  117. assert_same_sub_schema(obj, refobj)
  118. except BaseException as exc:
  119. exc.args += ('from ' + filename,)
  120. raise
  121. if __name__ == '__main__':
  122. unittest.main()