test_asciisignalio.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. """
  2. Tests of neo.io.asciisignalio
  3. """
  4. import os
  5. import unittest
  6. import json
  7. import csv
  8. import numpy as np
  9. import quantities as pq
  10. from numpy.testing import assert_array_almost_equal, assert_array_equal
  11. from neo.io import AsciiSignalIO
  12. from neo.test.iotest.common_io_test import BaseTestIO
  13. from neo.core import AnalogSignal, Segment, Block
  14. class TestAsciiSignalIOWithTestFiles(BaseTestIO, unittest.TestCase):
  15. ioclass = AsciiSignalIO
  16. files_to_download = [ # 'File_asciisignal_1.asc',
  17. 'File_asciisignal_2.txt',
  18. 'File_asciisignal_3.txt',
  19. ]
  20. files_to_test = files_to_download
  21. class TestAsciiSignalIO(unittest.TestCase):
  22. def test_genfromtxt_expect_success(self):
  23. sample_data = np.random.uniform(size=(200, 3))
  24. filename = "test_genfromtxt_expect_success.txt"
  25. np.savetxt(filename, sample_data, delimiter=' ')
  26. sampling_rate = 1 * pq.kHz
  27. io = AsciiSignalIO(filename, sampling_rate=sampling_rate, delimiter=' ',
  28. units='mV', method='genfromtxt')
  29. block = io.read_block()
  30. signal1 = block.segments[0].analogsignals[1]
  31. assert_array_almost_equal(signal1.reshape(-1).magnitude, sample_data[:, 1],
  32. decimal=6)
  33. self.assertEqual(len(block.segments[0].analogsignals), 3)
  34. self.assertEqual(signal1.t_stop, sample_data.shape[0] / sampling_rate)
  35. self.assertEqual(signal1.units, pq.mV)
  36. os.remove(filename)
  37. # test_genfromtxt_expect_failure
  38. def test_csv_expect_success(self):
  39. filename = 'test_csv_expect_success.csv'
  40. sample_data = [
  41. (-65, -65, -65, 0.5),
  42. (-64.8, -64.5, -64.0, 0.6),
  43. (-64.6, -64.2, -77.0, 0.7),
  44. (-64.3, -64.0, -99.9, 0.8)
  45. ]
  46. with open(filename, 'w') as csvfile:
  47. writer = csv.writer(csvfile, delimiter=',',
  48. quotechar='|', quoting=csv.QUOTE_MINIMAL)
  49. for row in sample_data:
  50. writer.writerow(row)
  51. io = AsciiSignalIO(filename, usecols=(0, 1, 3), timecolumn=2,
  52. # note that timecolumn applies to the remaining columns
  53. # after applying usecols
  54. time_units="ms", delimiter=',', units="mV", method='csv',
  55. signal_group_mode='all-in-one', t_start=0.5)
  56. block = io.read_block()
  57. signal = block.segments[0].analogsignals[0]
  58. self.assertEqual(signal.shape, (4, 2)) # two columns remaining after usecols
  59. # and timecolumn applied
  60. assert_array_almost_equal(signal[:, 1].reshape(-1).magnitude,
  61. np.array(sample_data)[:, 1],
  62. decimal=5)
  63. self.assertAlmostEqual(signal.sampling_period, 0.1 * pq.ms)
  64. os.remove(filename)
  65. # test_csv_expect_failure
  66. # test_homemade_expect_success
  67. def test_homemade_expect_success(self):
  68. filename = 'test_homemade_expect_success.txt'
  69. sample_data = [
  70. (-65, -65, -65, 0.5),
  71. (-64.8, -64.5, -64.0, 0.6),
  72. (-64.6, -64.2, -77.0, 0.7),
  73. (-64.3, -64.0, -99.9, 0.8)
  74. ]
  75. with open(filename, 'w') as datafile:
  76. datafile.write("# a comment\n")
  77. for row in sample_data:
  78. datafile.write("\t ".join(map(str, row)) + "\t\n")
  79. io = AsciiSignalIO(filename, usecols=(0, 1, 3), timecolumn=2, skiprows=1,
  80. time_units="ms", delimiter='\t', units="mV", method='homemade',
  81. signal_group_mode='all-in-one', t_start=0.5)
  82. block = io.read_block()
  83. signal = block.segments[0].analogsignals[0]
  84. self.assertEqual(signal.shape, (4, 2)) # two columns remaining after usecols
  85. # and timecolumn applied
  86. assert_array_almost_equal(signal[:, 1].reshape(-1).magnitude,
  87. np.array(sample_data)[:, 1],
  88. decimal=5)
  89. self.assertAlmostEqual(signal.sampling_period, 0.1 * pq.ms)
  90. os.remove(filename)
  91. # test_homemade_expect_failure
  92. def test_callable_expect_success(self):
  93. sample_data = np.random.uniform(size=(200, 3))
  94. filename = "test_genfromtxt_expect_success.txt"
  95. np.savetxt(filename, sample_data, delimiter=' ')
  96. sampling_rate = 1 * pq.kHz
  97. def reader(filename, comment_rows):
  98. return np.genfromtxt(filename, delimiter=' ', usecols=None,
  99. skip_header=comment_rows or 0, dtype='f')
  100. io = AsciiSignalIO(filename, sampling_rate=sampling_rate, delimiter=' ',
  101. units='mV', method=reader)
  102. block = io.read_block()
  103. signal1 = block.segments[0].analogsignals[1]
  104. assert_array_almost_equal(signal1.reshape(-1).magnitude, sample_data[:, 1],
  105. decimal=6)
  106. self.assertEqual(len(block.segments[0].analogsignals), 3)
  107. self.assertEqual(signal1.t_stop, sample_data.shape[0] / sampling_rate)
  108. self.assertEqual(signal1.units, pq.mV)
  109. os.remove(filename)
  110. # test usecols
  111. # test skiprows
  112. def test_timecolumn(self):
  113. sample_data = np.random.uniform(size=(200, 3))
  114. sampling_period = 0.5
  115. time_data = sampling_period * np.arange(sample_data.shape[0])
  116. combined_data = np.hstack((sample_data, time_data[:, np.newaxis]))
  117. filename = "test_multichannel.txt"
  118. np.savetxt(filename, combined_data, delimiter=' ')
  119. io = AsciiSignalIO(filename, delimiter=' ',
  120. units='mV', method='genfromtxt', timecolumn=-1,
  121. time_units='ms', signal_group_mode='split-all')
  122. block = io.read_block()
  123. signal1 = block.segments[0].analogsignals[1]
  124. assert_array_almost_equal(signal1.reshape(-1).magnitude, sample_data[:, 1],
  125. decimal=6)
  126. self.assertEqual(signal1.sampling_period, sampling_period * pq.ms)
  127. self.assertEqual(len(block.segments[0].analogsignals), 3)
  128. self.assertEqual(signal1.t_stop, sample_data.shape[0] * sampling_period * pq.ms)
  129. self.assertEqual(signal1.units, pq.mV)
  130. os.remove(filename)
  131. def test_multichannel(self):
  132. sample_data = np.random.uniform(size=(200, 3))
  133. filename = "test_multichannel.txt"
  134. np.savetxt(filename, sample_data, delimiter=' ')
  135. sampling_rate = 1 * pq.kHz
  136. io = AsciiSignalIO(filename, sampling_rate=sampling_rate, delimiter=' ',
  137. units='mV', method='genfromtxt',
  138. signal_group_mode='all-in-one')
  139. block = io.read_block()
  140. signal = block.segments[0].analogsignals[0]
  141. assert_array_almost_equal(signal.magnitude, sample_data,
  142. decimal=6)
  143. self.assertEqual(len(block.segments[0].analogsignals), 1)
  144. self.assertEqual(signal.t_stop, sample_data.shape[0] / sampling_rate)
  145. self.assertEqual(signal.units, pq.mV)
  146. os.remove(filename)
  147. def test_multichannel_with_timecolumn(self):
  148. sample_data = np.random.uniform(size=(200, 3))
  149. sampling_period = 0.5
  150. time_data = sampling_period * np.arange(sample_data.shape[0])
  151. combined_data = np.hstack((time_data[:, np.newaxis], sample_data))
  152. filename = "test_multichannel.txt"
  153. np.savetxt(filename, combined_data, delimiter=' ')
  154. io = AsciiSignalIO(filename, delimiter=' ',
  155. units='mV', method='genfromtxt', timecolumn=0,
  156. time_units='ms',
  157. signal_group_mode='all-in-one')
  158. block = io.read_block()
  159. signal = block.segments[0].analogsignals[0]
  160. assert_array_almost_equal(signal.magnitude, sample_data,
  161. decimal=6)
  162. self.assertEqual(signal.sampling_period, sampling_period * pq.ms)
  163. self.assertEqual(len(block.segments[0].analogsignals), 1)
  164. self.assertEqual(signal.t_stop, sample_data.shape[0] * sampling_period * pq.ms)
  165. self.assertEqual(signal.units, pq.mV)
  166. os.remove(filename)
  167. def test_multichannel_with_negative_timecolumn(self):
  168. sample_data = np.random.uniform(size=(200, 3))
  169. sampling_period = 0.5
  170. time_data = sampling_period * np.arange(sample_data.shape[0])
  171. combined_data = np.hstack((sample_data, time_data[:, np.newaxis]))
  172. filename = "test_multichannel.txt"
  173. np.savetxt(filename, combined_data, delimiter=' ')
  174. io = AsciiSignalIO(filename, delimiter=' ',
  175. units='mV', method='genfromtxt', timecolumn=-1,
  176. time_units='ms',
  177. signal_group_mode='all-in-one')
  178. block = io.read_block()
  179. signal = block.segments[0].analogsignals[0]
  180. assert_array_almost_equal(signal.magnitude, sample_data,
  181. decimal=6)
  182. self.assertEqual(signal.sampling_period, sampling_period * pq.ms)
  183. self.assertEqual(len(block.segments[0].analogsignals), 1)
  184. self.assertEqual(signal.t_stop, sample_data.shape[0] * sampling_period * pq.ms)
  185. self.assertEqual(signal.units, pq.mV)
  186. os.remove(filename)
  187. # test write without timecolumn
  188. # test write with timecolumn
  189. # test write with units/timeunits different from those of signal
  190. def test_read_with_json_metadata(self):
  191. sample_data = np.random.uniform(size=(200, 3))
  192. filename = "test_read_with_json_metadata.txt"
  193. metadata_filename = "test_read_with_json_metadata_about.json"
  194. np.savetxt(filename, sample_data, delimiter=' ')
  195. metadata = {
  196. "filename": filename,
  197. "delimiter": " ",
  198. "timecolumn": None,
  199. "units": "mV",
  200. "time_units": "ms",
  201. "sampling_rate": {
  202. "value": 1.0,
  203. "units": "kHz"
  204. },
  205. "method": "genfromtxt",
  206. "signal_group_mode": 'split-all'
  207. }
  208. with open(metadata_filename, "w") as fp:
  209. json.dump(metadata, fp)
  210. expected_sampling_rate = 1 * pq.kHz
  211. io = AsciiSignalIO(filename)
  212. block = io.read_block()
  213. signal1 = block.segments[0].analogsignals[1]
  214. assert_array_almost_equal(signal1.reshape(-1).magnitude, sample_data[:, 1],
  215. decimal=6)
  216. self.assertEqual(len(block.segments[0].analogsignals), 3)
  217. self.assertEqual(signal1.sampling_rate, expected_sampling_rate)
  218. self.assertEqual(signal1.t_stop, sample_data.shape[0] / signal1.sampling_rate)
  219. self.assertEqual(signal1.units, pq.mV)
  220. os.remove(filename)
  221. os.remove(metadata_filename)
  222. def test_roundtrip_with_json_metadata(self):
  223. sample_data = np.random.uniform(size=(200, 3))
  224. filename = "test_roundtrip_with_json_metadata.txt"
  225. metadata_filename = "test_roundtrip_with_json_metadata_about.json"
  226. signal1 = AnalogSignal(sample_data, units="pA", sampling_rate=2 * pq.kHz)
  227. seg1 = Segment()
  228. block1 = Block()
  229. seg1.analogsignals.append(signal1)
  230. seg1.block = block1
  231. block1.segments.append(seg1)
  232. iow = AsciiSignalIO(filename, metadata_filename=metadata_filename)
  233. iow.write_block(block1)
  234. self.assert_(os.path.exists(metadata_filename))
  235. ior = AsciiSignalIO(filename)
  236. block2 = ior.read_block()
  237. assert len(block2.segments[0].analogsignals) == 3
  238. signal2 = block2.segments[0].analogsignals[1]
  239. assert_array_almost_equal(signal1.magnitude[:, 1], signal2.magnitude.reshape(-1),
  240. decimal=7)
  241. self.assertEqual(signal1.units, signal2.units)
  242. self.assertEqual(signal1.sampling_rate, signal2.sampling_rate)
  243. assert_array_equal(signal1.times, signal2.times)
  244. os.remove(filename)
  245. os.remove(metadata_filename)
  246. def test_genfromtxt_irregular_expect_success(self):
  247. sample_data = np.random.uniform(size=(200, 3))
  248. sample_data[:, 0] = np.sort(sample_data[:, 0]) # make column 0 the time column
  249. filename = "test_genfromtxt_irregular_expect_success.txt"
  250. np.savetxt(filename, sample_data, delimiter=' ')
  251. io = AsciiSignalIO(filename, delimiter=' ', timecolumn=0,
  252. units='mV', method='genfromtxt', signal_group_mode='split-all')
  253. block = io.read_block()
  254. signal1 = block.segments[0].irregularlysampledsignals[1]
  255. assert_array_almost_equal(signal1.reshape(-1).magnitude, sample_data[:, 2],
  256. decimal=6)
  257. self.assertEqual(len(block.segments[0].analogsignals), 0)
  258. self.assertEqual(len(block.segments[0].irregularlysampledsignals), 2)
  259. self.assertEqual(signal1.units, pq.mV)
  260. os.remove(filename)
  261. def test_irregular_multichannel(self):
  262. sample_data = np.random.uniform(size=(200, 3))
  263. sample_data[:, 0] = np.sort(sample_data[:, 0]) # make column 0 the time column
  264. filename = "test_irregular_multichannel.txt"
  265. np.savetxt(filename, sample_data, delimiter=' ')
  266. io = AsciiSignalIO(filename, delimiter=' ', timecolumn=0,
  267. units='mV', method='genfromtxt', signal_group_mode='all-in-one')
  268. block = io.read_block()
  269. signal = block.segments[0].irregularlysampledsignals[0]
  270. assert_array_almost_equal(signal.magnitude, sample_data[:, 1:3],
  271. decimal=6)
  272. self.assertEqual(len(block.segments[0].analogsignals), 0)
  273. self.assertEqual(len(block.segments[0].irregularlysampledsignals), 1)
  274. self.assertEqual(signal.shape, (200, 2))
  275. self.assertEqual(signal.units, pq.mV)
  276. os.remove(filename)
  277. if __name__ == "__main__":
  278. unittest.main()