tools.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. # -*- coding: utf-8 -*-
  2. """
  3. Common tools that are useful for neo.io object tests
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import logging
  8. import os
  9. import shutil
  10. import tempfile
  11. try:
  12. from urllib import urlretrieve # Py2
  13. except ImportError:
  14. from urllib.request import urlretrieve # Py3
  15. from neo.core import Block, Segment
  16. from neo.test.generate_datasets import generate_from_supported_objects
  17. def can_use_network():
  18. """
  19. Return True if network access is allowed
  20. """
  21. if os.environ.get('NOSETESTS_NO_NETWORK', False):
  22. return False
  23. if os.environ.get('TRAVIS') == 'true':
  24. return False
  25. return True
  26. def make_all_directories(filename, localdir):
  27. """
  28. Make the directories needed to store test files
  29. """
  30. # handle case of multiple filenames
  31. if not hasattr(filename, 'lower'):
  32. for ifilename in filename:
  33. make_all_directories(ifilename, localdir)
  34. return
  35. fullpath = os.path.join(localdir, os.path.dirname(filename))
  36. if os.path.dirname(filename) != '' and not os.path.exists(fullpath):
  37. if not os.path.exists(os.path.dirname(fullpath)):
  38. make_all_directories(os.path.dirname(filename), localdir)
  39. os.mkdir(fullpath)
  40. def download_test_file(filename, localdir, url):
  41. """
  42. Download a test file from a server if it isn't already available.
  43. filename is the name of the file.
  44. localdir is the local directory to store the file in.
  45. url is the remote url that the file should be downloaded from.
  46. """
  47. # handle case of multiple filenames
  48. if not hasattr(filename, 'lower'):
  49. for ifilename in filename:
  50. download_test_file(ifilename, localdir, url)
  51. return
  52. localfile = os.path.join(localdir, filename)
  53. distantfile = url + '/' + filename
  54. if not os.path.exists(localfile):
  55. logging.info('Downloading %s here %s', distantfile, localfile)
  56. urlretrieve(distantfile, localfile)
  57. def create_local_temp_dir(name, directory=None):
  58. """
  59. Create a directory for storing temporary files needed for testing neo
  60. If directory is None or not specified, automatically create the directory
  61. in {tempdir}/files_for_testing_neo on linux/unix/mac or
  62. {tempdir}\files_for_testing_neo on windows, where {tempdir} is the system
  63. temporary directory returned by tempfile.gettempdir().
  64. """
  65. if directory is None:
  66. directory = os.path.join(tempfile.gettempdir(),
  67. 'files_for_testing_neo')
  68. if not os.path.exists(directory):
  69. os.mkdir(directory)
  70. directory = os.path.join(directory, name)
  71. if not os.path.exists(directory):
  72. os.mkdir(directory)
  73. return directory
  74. def close_object_safe(obj):
  75. """
  76. Close an object safely, ignoring errors
  77. For some io types, like HDF5IO, the file should be closed before being
  78. opened again in a test. Call this after the test is done to make sure
  79. the file is closed.
  80. """
  81. try:
  82. obj.close()
  83. except:
  84. pass
  85. def cleanup_test_file(mode, path, directory=None):
  86. """
  87. Remove test files or directories safely. mode is the mode of the io class,
  88. either 'file' or 'directory'. It can also be an io class object, or any
  89. other object with a 'mode' attribute. If that is the case, use the
  90. 'mode' attribute from the object.
  91. If directory is not None and path is not an absolute path already,
  92. use the file from the given directory.
  93. """
  94. if directory is not None and not os.path.isabs(path):
  95. path = os.path.join(directory, path)
  96. if hasattr(mode, 'mode'):
  97. mode = mode.mode
  98. if mode == 'file':
  99. if os.path.exists(path):
  100. os.remove(path)
  101. elif mode == 'dir':
  102. if os.path.exists(path):
  103. shutil.rmtree(path)
  104. def get_test_file_full_path(ioclass, filename=None,
  105. directory=None, clean=False):
  106. """
  107. Get the full path for a file of the given filename.
  108. If filename is None, create a filename.
  109. If filename is a list, get the full path for each item in the list.
  110. If return_path is True, also return the full path to the file.
  111. If directory is not None and path is not an absolute path already,
  112. use the file from the given directory.
  113. If return_path is True, return the full path of the file along with
  114. the io object. return reader, path. Default is False.
  115. If clean is True, try to delete existing versions of the file
  116. before creating the io object. Default is False.
  117. """
  118. # create a filename if none is provided
  119. if filename is None:
  120. filename = 'Generated0_%s' % ioclass.__name__
  121. if (ioclass.mode == 'file' and len(ioclass.extensions) >= 1):
  122. filename += '.' + ioclass.extensions[0]
  123. elif not hasattr(filename, 'lower'):
  124. return [get_test_file_full_path(ioclass, filename=fname,
  125. directory=directory, clean=clean) for
  126. fname in filename]
  127. # if a directory is provided add it
  128. if directory is not None and not os.path.isabs(filename):
  129. filename = os.path.join(directory, filename)
  130. if clean:
  131. cleanup_test_file(ioclass, filename)
  132. return filename
  133. get_test_file_full_path.__test__ = False # prevent this being treated as a test if imported into a test file
  134. def create_generic_io_object(ioclass, filename=None, directory=None,
  135. return_path=False, clean=False):
  136. """
  137. Create an io object in a generic way that can work with both
  138. file-based and directory-based io objects
  139. If filename is None, create a filename.
  140. If return_path is True, also return the full path to the file.
  141. If directory is not None and path is not an absolute path already,
  142. use the file from the given directory.
  143. If return_path is True, return the full path of the file along with
  144. the io object. return reader, path. Default is False.
  145. If clean is True, try to delete existing versions of the file
  146. before creating the io object. Default is False.
  147. """
  148. filename = get_test_file_full_path(ioclass, filename=filename,
  149. directory=directory, clean=clean)
  150. try:
  151. # actually create the object
  152. if ioclass.mode == 'file':
  153. ioobj = ioclass(filename=filename)
  154. elif ioclass.mode == 'dir':
  155. ioobj = ioclass(dirname=filename)
  156. else:
  157. ioobj = None
  158. except:
  159. print(filename)
  160. raise
  161. # return the full path if requested, otherwise don't
  162. if return_path:
  163. return ioobj, filename
  164. return ioobj
  165. def iter_generic_io_objects(ioclass, filenames, directory=None,
  166. return_path=False, clean=False):
  167. """
  168. Return an iterable over the io objects created from a list of filenames.
  169. The objects are automatically cleaned up afterwards.
  170. If directory is not None and path is not an absolute path already,
  171. use the file from the given directory.
  172. If return_path is True, yield the full path of the file along with
  173. the io object. yield reader, path. Default is False.
  174. If clean is True, try to delete existing versions of the file
  175. before creating the io object. Default is False.
  176. """
  177. for filename in filenames:
  178. ioobj, path = create_generic_io_object(ioclass, filename=filename,
  179. directory=directory,
  180. return_path=True,
  181. clean=clean)
  182. if ioobj is None:
  183. continue
  184. if return_path:
  185. yield ioobj, path
  186. else:
  187. yield ioobj
  188. close_object_safe(ioobj)
  189. def create_generic_reader(ioobj, target=None, readall=False):
  190. """
  191. Create a function that can read the target object from a file.
  192. If target is None, use the first supported_objects from ioobj
  193. If target is False, use the 'read' method.
  194. If target is the Block or Segment class, use read_block or read_segment,
  195. respectively.
  196. If target is a string, use 'read_'+target.
  197. If readall is True, use the read_all_ method instead of the read_ method.
  198. Default is False.
  199. """
  200. if target is None:
  201. target = ioobj.supported_objects[0].__name__
  202. if target == Block:
  203. if readall:
  204. return ioobj.read_all_blocks
  205. return ioobj.read_block
  206. elif target == Segment:
  207. if readall:
  208. return ioobj.read_all_segments
  209. return ioobj.read_segment
  210. elif not target:
  211. if readall:
  212. raise ValueError('readall cannot be True if target is False')
  213. return ioobj.read
  214. elif hasattr(target, 'lower'):
  215. if readall:
  216. return getattr(ioobj, 'read_all_%ss' % target.lower())
  217. return getattr(ioobj, 'read_%s' % target.lower())
  218. def iter_generic_readers(ioclass, filenames, directory=None, target=None,
  219. return_path=False, return_ioobj=False,
  220. clean=False, readall=False):
  221. """
  222. Iterate over functions that can read the target object from a list of
  223. filenames.
  224. If target is None, use the first supported_objects from ioobj
  225. If target is False, use the 'read' method.
  226. If target is the Block or Segment class, use read_block or read_segment,
  227. respectively.
  228. If target is a string, use 'read_'+target.
  229. If directory is not None and path is not an absolute path already,
  230. use the file from the given directory.
  231. If return_path is True, return the full path of the file along with
  232. the reader object. return reader, path.
  233. If return_ioobj is True, return the io object as well as the reader.
  234. return reader, ioobj. Default is False.
  235. If both return_path and return_ioobj is True,
  236. return reader, path, ioobj. Default is False.
  237. If clean is True, try to delete existing versions of the file
  238. before creating the io object. Default is False.
  239. If readall is True, use the read_all_ method instead of the read_ method.
  240. Default is False.
  241. """
  242. for ioobj, path in iter_generic_io_objects(ioclass=ioclass,
  243. filenames=filenames,
  244. directory=directory,
  245. return_path=True,
  246. clean=clean):
  247. res = create_generic_reader(ioobj, target=target, readall=readall)
  248. if not return_path and not return_ioobj:
  249. yield res
  250. else:
  251. res = (res, )
  252. if return_path:
  253. res = res + (path,)
  254. if return_ioobj:
  255. res = res + (ioobj,)
  256. yield res
  257. def create_generic_writer(ioobj, target=None):
  258. """
  259. Create a function that can write the target object to a file using the
  260. neo io object ioobj.
  261. If target is None, use the first supported_objects from ioobj
  262. If target is False, use the 'write' method.
  263. If target is the Block or Segment class, use write_block or write_segment,
  264. respectively.
  265. If target is a string, use 'write_'+target.
  266. """
  267. if target is None:
  268. target = ioobj.supported_objects[0].__name__
  269. if target == Block:
  270. return ioobj.write_block
  271. elif target == Segment:
  272. return ioobj.write_segment
  273. elif not target:
  274. return ioobj.write
  275. elif hasattr(target, 'lower'):
  276. return getattr(ioobj, 'write_' + target.lower())
  277. def read_generic(ioobj, target=None, cascade=True, lazy=False, readall=False,
  278. return_reader=False):
  279. """
  280. Read the target object from a file using the given neo io object ioobj.
  281. If target is None, use the first supported_objects from ioobj
  282. If target is False, use the 'write' method.
  283. If target is the Block or Segment class, use write_block or write_segment,
  284. respectively.
  285. If target is a string, use 'write_'+target.
  286. The cascade and lazy parameters are passed to the reader. Defaults
  287. are True and False, respectively.
  288. If readall is True, use the read_all_ method instead of the read_ method.
  289. Default is False.
  290. If return_reader is True, yield the io reader function as well as the
  291. object. yield obj, reader. Default is False.
  292. """
  293. obj_reader = create_generic_reader(ioobj, target=target, readall=readall)
  294. obj = obj_reader(cascade=cascade, lazy=lazy)
  295. if return_reader:
  296. return obj, obj_reader
  297. return obj
  298. def iter_read_objects(ioclass, filenames, directory=None, target=None,
  299. return_path=False, return_ioobj=False,
  300. return_reader=False, clean=False, readall=False,
  301. cascade=True, lazy=False):
  302. """
  303. Iterate over objects read from a list of filenames.
  304. If target is None, use the first supported_objects from ioobj
  305. If target is False, use the 'read' method.
  306. If target is the Block or Segment class, use read_block or read_segment,
  307. respectively.
  308. If target is a string, use 'read_'+target.
  309. If directory is not None and path is not an absolute path already,
  310. use the file from the given directory.
  311. If return_path is True, yield the full path of the file along with
  312. the object. yield obj, path.
  313. If return_ioobj is True, yield the io object as well as the object.
  314. yield obj, ioobj. Default is False.
  315. If return_reader is True, yield the io reader function as well as the
  316. object. yield obj, reader. Default is False.
  317. If some combination of return_path, return_ioobj, and return_reader
  318. is True, they are yielded in the order: obj, path, ioobj, reader.
  319. If clean is True, try to delete existing versions of the file
  320. before creating the io object. Default is False.
  321. The cascade and lazy parameters are passed to the reader. Defaults
  322. are True and False, respectively.
  323. If readall is True, use the read_all_ method instead of the read_ method.
  324. Default is False.
  325. """
  326. for obj_reader, path, ioobj in iter_generic_readers(ioclass, filenames,
  327. directory=directory,
  328. target=target,
  329. return_path=True,
  330. return_ioobj=True,
  331. clean=clean,
  332. readall=readall):
  333. obj = obj_reader(cascade=cascade, lazy=lazy)
  334. if not return_path and not return_ioobj and not return_reader:
  335. yield obj
  336. else:
  337. obj = (obj, )
  338. if return_path:
  339. obj = obj + (path,)
  340. if return_ioobj:
  341. obj = obj + (ioobj,)
  342. if return_reader:
  343. obj = obj + (obj_reader,)
  344. yield obj
  345. def write_generic(ioobj, target=None, obj=None, return_writer=False):
  346. """
  347. Write the target object to a file using the given neo io object ioobj.
  348. If target is None, use the first supported_objects from ioobj
  349. If target is False, use the 'write' method.
  350. If target is the Block or Segment class, use write_block or write_segment,
  351. respectively.
  352. If target is a string, use 'write_'+target.
  353. obj is the object to write. If obj is None, an object is created
  354. automatically for the io class.
  355. If return_writer is True, yield the io writer function as well as the
  356. object. yield obj, writer. Default is False.
  357. """
  358. if obj is None:
  359. supported_objects = ioobj.supported_objects
  360. obj = generate_from_supported_objects(supported_objects)
  361. obj_writer = create_generic_writer(ioobj, target=target)
  362. obj_writer(obj)
  363. if return_writer:
  364. return obj, obj_writer
  365. return obj