tools.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. """
  2. Common tools that are useful for neo.io object tests
  3. """
  4. import os
  5. import shutil
  6. from neo.core import Block, Segment
  7. from neo.test.generate_datasets import generate_from_supported_objects
  8. def close_object_safe(obj):
  9. """
  10. Close an object safely, ignoring errors
  11. For some io types, like HDF5IO, the file should be closed before being
  12. opened again in a test. Call this after the test is done to make sure
  13. the file is closed.
  14. """
  15. try:
  16. obj.close()
  17. except:
  18. pass
  19. def cleanup_test_file(mode, path, directory=None):
  20. """
  21. Remove test files or directories safely. mode is the mode of the io class,
  22. either 'file' or 'directory'. It can also be an io class object, or any
  23. other object with a 'mode' attribute. If that is the case, use the
  24. 'mode' attribute from the object.
  25. If directory is not None and path is not an absolute path already,
  26. use the file from the given directory.
  27. """
  28. if directory is not None and not os.path.isabs(path):
  29. path = os.path.join(directory, path)
  30. if hasattr(mode, 'mode'):
  31. mode = mode.mode
  32. if mode == 'file':
  33. if os.path.exists(path):
  34. os.remove(path)
  35. elif mode == 'dir':
  36. if os.path.exists(path):
  37. shutil.rmtree(path)
  38. def get_test_file_full_path(ioclass, filename=None,
  39. directory=None, clean=False):
  40. """
  41. Get the full path for a file of the given filename.
  42. If filename is None, create a filename.
  43. If filename is a list, get the full path for each item in the list.
  44. If return_path is True, also return the full path to the file.
  45. If directory is not None and path is not an absolute path already,
  46. use the file from the given directory.
  47. If return_path is True, return the full path of the file along with
  48. the io object. return reader, path. Default is False.
  49. If clean is True, try to delete existing versions of the file
  50. before creating the io object. Default is False.
  51. """
  52. # create a filename if none is provided
  53. if filename is None:
  54. filename = 'Generated0_%s' % ioclass.__name__
  55. if (ioclass.mode == 'file' and len(ioclass.extensions) >= 1):
  56. filename += '.' + ioclass.extensions[0]
  57. elif not hasattr(filename, 'lower'):
  58. return [get_test_file_full_path(ioclass, filename=fname,
  59. directory=directory, clean=clean) for
  60. fname in filename]
  61. # if a directory is provided add it
  62. if directory is not None and not os.path.isabs(filename):
  63. filename = os.path.join(directory, filename)
  64. if clean:
  65. cleanup_test_file(ioclass, filename)
  66. return filename
  67. # prevent this being treated as a test if imported into a test file
  68. get_test_file_full_path.__test__ = False
  69. def create_generic_io_object(ioclass, filename=None, directory=None,
  70. return_path=False, clean=False):
  71. """
  72. Create an io object in a generic way that can work with both
  73. file-based and directory-based io objects
  74. If filename is None, create a filename.
  75. If return_path is True, also return the full path to the file.
  76. If directory is not None and path is not an absolute path already,
  77. use the file from the given directory.
  78. If return_path is True, return the full path of the file along with
  79. the io object. return reader, path. Default is False.
  80. If clean is True, try to delete existing versions of the file
  81. before creating the io object. Default is False.
  82. """
  83. filename = get_test_file_full_path(ioclass, filename=filename,
  84. directory=directory, clean=clean)
  85. try:
  86. # actually create the object
  87. if ioclass.mode == 'file':
  88. ioobj = ioclass(filename=filename)
  89. elif ioclass.mode == 'dir':
  90. ioobj = ioclass(dirname=filename)
  91. else:
  92. ioobj = None
  93. except:
  94. print(filename)
  95. raise
  96. # return the full path if requested, otherwise don't
  97. if return_path:
  98. return ioobj, filename
  99. return ioobj
  100. def iter_generic_io_objects(ioclass, filenames, directory=None,
  101. return_path=False, clean=False):
  102. """
  103. Return an iterable over the io objects created from a list of filenames.
  104. The objects are automatically cleaned up afterwards.
  105. If directory is not None and path is not an absolute path already,
  106. use the file from the given directory.
  107. If return_path is True, yield the full path of the file along with
  108. the io object. yield reader, path. Default is False.
  109. If clean is True, try to delete existing versions of the file
  110. before creating the io object. Default is False.
  111. """
  112. for filename in filenames:
  113. ioobj, path = create_generic_io_object(ioclass, filename=filename,
  114. directory=directory,
  115. return_path=True,
  116. clean=clean)
  117. if ioobj is None:
  118. continue
  119. if return_path:
  120. yield ioobj, path
  121. else:
  122. yield ioobj
  123. close_object_safe(ioobj)
  124. def create_generic_reader(ioobj, target=None, readall=False):
  125. """
  126. Create a function that can read the target object from a file.
  127. If target is None, use the first supported_objects from ioobj
  128. If target is False, use the 'read' method.
  129. If target is the Block or Segment class, use read_block or read_segment,
  130. respectively.
  131. If target is a string, use 'read_'+target.
  132. If readall is True, use the read_all_ method instead of the read_ method.
  133. Default is False.
  134. """
  135. if target is None:
  136. target = ioobj.supported_objects[0].__name__
  137. if target == Block:
  138. if readall:
  139. return ioobj.read_all_blocks
  140. return ioobj.read_block
  141. elif target == Segment:
  142. if readall:
  143. return ioobj.read_all_segments
  144. return ioobj.read_segment
  145. elif not target:
  146. if readall:
  147. raise ValueError('readall cannot be True if target is False')
  148. return ioobj.read
  149. elif hasattr(target, 'lower'):
  150. if readall:
  151. return getattr(ioobj, 'read_all_%ss' % target.lower())
  152. return getattr(ioobj, 'read_%s' % target.lower())
  153. def iter_generic_readers(ioclass, filenames, directory=None, target=None,
  154. return_path=False, return_ioobj=False,
  155. clean=False, readall=False):
  156. """
  157. Iterate over functions that can read the target object from a list of
  158. filenames.
  159. If target is None, use the first supported_objects from ioobj
  160. If target is False, use the 'read' method.
  161. If target is the Block or Segment class, use read_block or read_segment,
  162. respectively.
  163. If target is a string, use 'read_'+target.
  164. If directory is not None and path is not an absolute path already,
  165. use the file from the given directory.
  166. If return_path is True, return the full path of the file along with
  167. the reader object. return reader, path.
  168. If return_ioobj is True, return the io object as well as the reader.
  169. return reader, ioobj. Default is False.
  170. If both return_path and return_ioobj is True,
  171. return reader, path, ioobj. Default is False.
  172. If clean is True, try to delete existing versions of the file
  173. before creating the io object. Default is False.
  174. If readall is True, use the read_all_ method instead of the read_ method.
  175. Default is False.
  176. """
  177. for ioobj, path in iter_generic_io_objects(ioclass=ioclass,
  178. filenames=filenames,
  179. directory=directory,
  180. return_path=True,
  181. clean=clean):
  182. res = create_generic_reader(ioobj, target=target, readall=readall)
  183. if not return_path and not return_ioobj:
  184. yield res
  185. else:
  186. res = (res,)
  187. if return_path:
  188. res = res + (path,)
  189. if return_ioobj:
  190. res = res + (ioobj,)
  191. yield res
  192. def create_generic_writer(ioobj, target=None):
  193. """
  194. Create a function that can write the target object to a file using the
  195. neo io object ioobj.
  196. If target is None, use the first supported_objects from ioobj
  197. If target is False, use the 'write' method.
  198. If target is the Block or Segment class, use write_block or write_segment,
  199. respectively.
  200. If target is a string, use 'write_'+target.
  201. """
  202. if target is None:
  203. target = ioobj.supported_objects[0].__name__
  204. if target == Block:
  205. return ioobj.write_block
  206. elif target == Segment:
  207. return ioobj.write_segment
  208. elif not target:
  209. return ioobj.write
  210. elif hasattr(target, 'lower'):
  211. return getattr(ioobj, 'write_' + target.lower())
  212. def read_generic(ioobj, target=None, lazy=False, readall=False,
  213. return_reader=False):
  214. """
  215. Read the target object from a file using the given neo io object ioobj.
  216. If target is None, use the first supported_objects from ioobj
  217. If target is False, use the 'write' method.
  218. If target is the Block or Segment class, use write_block or write_segment,
  219. respectively.
  220. If target is a string, use 'write_'+target.
  221. The lazy parameters is passed to the reader. Defaults is True.
  222. If readall is True, use the read_all_ method instead of the read_ method.
  223. Default is False.
  224. If return_reader is True, yield the io reader function as well as the
  225. object. yield obj, reader. Default is False.
  226. """
  227. obj_reader = create_generic_reader(ioobj, target=target, readall=readall)
  228. obj = obj_reader(lazy=lazy)
  229. if return_reader:
  230. return obj, obj_reader
  231. return obj
  232. def iter_read_objects(ioclass, filenames, directory=None, target=None,
  233. return_path=False, return_ioobj=False,
  234. return_reader=False, clean=False, readall=False,
  235. lazy=False):
  236. """
  237. Iterate over objects read from a list of filenames.
  238. If target is None, use the first supported_objects from ioobj
  239. If target is False, use the 'read' method.
  240. If target is the Block or Segment class, use read_block or read_segment,
  241. respectively.
  242. If target is a string, use 'read_'+target.
  243. If directory is not None and path is not an absolute path already,
  244. use the file from the given directory.
  245. If return_path is True, yield the full path of the file along with
  246. the object. yield obj, path.
  247. If return_ioobj is True, yield the io object as well as the object.
  248. yield obj, ioobj. Default is False.
  249. If return_reader is True, yield the io reader function as well as the
  250. object. yield obj, reader. Default is False.
  251. If some combination of return_path, return_ioobj, and return_reader
  252. is True, they are yielded in the order: obj, path, ioobj, reader.
  253. If clean is True, try to delete existing versions of the file
  254. before creating the io object. Default is False.
  255. The lazy parameter is passed to the reader. Defaults is True.
  256. If readall is True, use the read_all_ method instead of the read_ method.
  257. Default is False.
  258. """
  259. for obj_reader, path, ioobj in iter_generic_readers(ioclass, filenames,
  260. directory=directory,
  261. target=target,
  262. return_path=True,
  263. return_ioobj=True,
  264. clean=clean,
  265. readall=readall):
  266. obj = obj_reader(lazy=lazy)
  267. if not return_path and not return_ioobj and not return_reader:
  268. yield obj
  269. else:
  270. obj = (obj,)
  271. if return_path:
  272. obj = obj + (path,)
  273. if return_ioobj:
  274. obj = obj + (ioobj,)
  275. if return_reader:
  276. obj = obj + (obj_reader,)
  277. yield obj
  278. def write_generic(ioobj, target=None, obj=None, return_writer=False):
  279. """
  280. Write the target object to a file using the given neo io object ioobj.
  281. If target is None, use the first supported_objects from ioobj
  282. If target is False, use the 'write' method.
  283. If target is the Block or Segment class, use write_block or write_segment,
  284. respectively.
  285. If target is a string, use 'write_'+target.
  286. obj is the object to write. If obj is None, an object is created
  287. automatically for the io class.
  288. If return_writer is True, yield the io writer function as well as the
  289. object. yield obj, writer. Default is False.
  290. """
  291. if obj is None:
  292. supported_objects = ioobj.supported_objects
  293. obj = generate_from_supported_objects(supported_objects)
  294. obj_writer = create_generic_writer(ioobj, target=target)
  295. obj_writer(obj)
  296. if return_writer:
  297. return obj, obj_writer
  298. return obj