parallel.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import concurrent.futures
  2. from functools import update_wrapper, partial
  3. class SingleProcess(object):
  4. """
  5. A fall-back parallel context that executes jobs sequentially.
  6. """
  7. def __repr__(self):
  8. return "{name}({extra})".format(name=self.__class__.__name__,
  9. extra=self._extra_repr())
  10. def _extra_repr(self):
  11. return ""
  12. @staticmethod
  13. def _update_handler(handler, **kwargs):
  14. handler_wrapper = partial(handler, **kwargs)
  15. update_wrapper(handler_wrapper, handler)
  16. return handler_wrapper
  17. def execute(self, handler, args_iterate, **kwargs):
  18. """
  19. Executes the queue of
  20. `[handler(arg, **kwargs) for arg in args_iterate]` in a single process
  21. (no speedup).
  22. Parameters
  23. ----------
  24. handler : callable
  25. A function to be executed for each argument in `args_iterate`.
  26. args_iterate : list
  27. A list of (different) values of the first argument of the `handler`
  28. function.
  29. kwargs
  30. Additional key arguments to `handler`.
  31. Returns
  32. -------
  33. results : list
  34. The result of applying the `handler` for each `arg` in the
  35. `args_iterate`. The `i`-th item of the resulting list corresponds
  36. to `args_iterate[i]` (the order is preserved).
  37. """
  38. handler = self._update_handler(handler, **kwargs)
  39. results = [handler(arg) for arg in args_iterate]
  40. return results
  41. class ProcessPoolExecutor(SingleProcess):
  42. """
  43. The wrapper of Python built-in `concurrent.futures.ProcessPoolExecutor`
  44. class.
  45. `ProcessPoolExecutor` is recommended to use if you have one physical
  46. machine (laptop or PC).
  47. Parameters
  48. ----------
  49. max_workers : int or None
  50. The maximum number of processes that can be used to
  51. execute the given calls. If None or not given then as many
  52. worker processes will be created as the machine has processors.
  53. Default: None
  54. """
  55. def __init__(self, max_workers=None):
  56. self.max_workers = max_workers
  57. def _extra_repr(self):
  58. return "max_workers={0}".format(self.max_workers)
  59. def _create_executor(self):
  60. return concurrent.futures.ProcessPoolExecutor(self.max_workers)
  61. def execute(self, handler, args_iterate, **kwargs):
  62. """
  63. Executes the queue of
  64. `[handler(arg, **kwargs) for arg in args_iterate]` in multiple
  65. processes within one machine (`ProcessPoolExecutor`) or multiple
  66. nodes (`MPIPoolExecutor` and `MPICommExecutor`).
  67. Parameters
  68. ----------
  69. handler : callable
  70. A function to be executed for each argument in `args_iterate`.
  71. args_iterate : list
  72. A list of (different) values of the first argument of the `handler`
  73. function.
  74. kwargs
  75. Additional key arguments to `handler`.
  76. Returns
  77. -------
  78. results : list
  79. The result of applying the `handler` for each `arg` in the
  80. `args_iterate`. The `i`-th item of the resulting list corresponds
  81. to `args_iterate[i]` (the order is preserved).
  82. """
  83. handler = self._update_handler(handler, **kwargs)
  84. # if not initialized, MPICommExecutor crashes if run without
  85. # -m mpi4py.futures mode
  86. results = []
  87. with self._create_executor() as executor:
  88. results = executor.map(handler, args_iterate)
  89. # print(executor, results)
  90. results = list(results) # convert a map to a list
  91. return results