dataladcmd_exec.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. from concurrent.futures import ThreadPoolExecutor
  2. import threading
  3. from time import time
  4. from types import MappingProxyType
  5. from typing import (
  6. Dict,
  7. )
  8. from PySide6.QtCore import (
  9. QObject,
  10. Qt,
  11. Signal,
  12. Slot,
  13. )
  14. from PySide6.QtWidgets import (
  15. QToolButton,
  16. )
  17. from datalad.interface.base import Interface
  18. from datalad.support.exceptions import CapturedException
  19. from datalad.utils import get_wrapped_class
  20. from .resource_provider import gooey_resources
  21. # lazy import
  22. dlapi = None
  23. class GooeyDataladCmdExec(QObject):
  24. """Non-blocking execution of DataLad API commands
  25. and Qt-signal result reporting
  26. """
  27. # thread_id, cmdname, cmdargs/kwargs, exec_params
  28. execution_started = Signal(str, str, MappingProxyType, MappingProxyType)
  29. execution_finished = Signal(str, str, MappingProxyType, MappingProxyType)
  30. # thread_id, cmdname, cmdargs/kwargs, exec_params, CapturedException
  31. execution_failed = Signal(str, str, MappingProxyType, MappingProxyType, CapturedException)
  32. results_received = Signal(Interface, list)
  33. def __init__(self):
  34. super().__init__()
  35. aw = QToolButton()
  36. aw.setAutoRaise(True)
  37. aw.clicked.connect(self._stop_thread)
  38. aw.hide()
  39. self._activity_widget = aw
  40. self.execution_started.connect(self._enable_activity_widget)
  41. self.execution_finished.connect(self._disable_activity_widget)
  42. self.execution_failed.connect(self._disable_activity_widget)
  43. # flag whether a running thread should stop ASAP
  44. self._kaboom = False
  45. self._threadpool = ThreadPoolExecutor(
  46. max_workers=1,
  47. thread_name_prefix='gooey_datalad_cmdexec',
  48. # some callable to start at each thread execution
  49. #initializer=self.
  50. #initargs=
  51. )
  52. self._futures = set()
  53. # connect maintenance slot to give us an accurate
  54. # assessment of ongoing commands
  55. self.execution_finished.connect(self._update_futures)
  56. self.execution_failed.connect(self._update_futures)
  57. def _update_futures(self):
  58. self._futures = set(f for f in self._futures if f.running())
  59. @Slot(str, dict)
  60. def execute(self, cmd: str,
  61. kwargs: MappingProxyType or None = None,
  62. exec_params: MappingProxyType or None = None):
  63. if kwargs is None:
  64. kwargs = dict()
  65. if exec_params is None:
  66. exec_params = dict()
  67. global dlapi
  68. if dlapi is None:
  69. from datalad import api as dl
  70. dlapi = dl
  71. # right now, we have no use for the returned future, because result
  72. # communication and thread finishing are handled by emitting Qt signals
  73. self._futures.add(self._threadpool.submit(
  74. self._cmdexec_thread,
  75. cmd,
  76. kwargs,
  77. exec_params,
  78. ))
  79. def _cmdexec_thread(
  80. self, cmdname: str,
  81. cmdkwargs: MappingProxyType,
  82. exec_params: MappingProxyType):
  83. """The code is executed in a worker thread"""
  84. # we need to amend the record below, make a mutable version
  85. cmdkwargs = cmdkwargs.copy()
  86. print('EXECINTHREAD', cmdname, cmdkwargs, exec_params)
  87. preferred_result_interval = exec_params.get(
  88. 'preferred_result_interval', 1.0)
  89. res_override = exec_params.get(
  90. 'result_override', {})
  91. # get_ident() is an int, but in the future we might want to move
  92. # to PY3.8+ native thread IDs, so let's go with a string identifier
  93. # right away
  94. thread_id = str(threading.get_ident())
  95. # get functor to execute, resolve name against full API
  96. try:
  97. cmd = getattr(dlapi, cmdname)
  98. cls = get_wrapped_class(cmd)
  99. except Exception as e:
  100. self.execution_failed.emit(
  101. thread_id,
  102. cmdname,
  103. cmdkwargs,
  104. exec_params,
  105. CapturedException(e),
  106. )
  107. return
  108. try:
  109. # the following is trivial, but we wrap it nevertheless to prevent
  110. # a silent crash of the worker thread
  111. self.execution_started.emit(
  112. thread_id,
  113. cmdname,
  114. cmdkwargs,
  115. exec_params,
  116. )
  117. # enforce return_type='generator' to get the most responsive
  118. # any command could be
  119. cmdkwargs['return_type'] = 'generator'
  120. # Unless explicitly specified, force result records instead of the
  121. # command's default transformation which might give Dataset
  122. # instances for example.
  123. if 'result_xfm' not in cmdkwargs:
  124. cmdkwargs['result_xfm'] = None
  125. if 'dataset' in cmdkwargs:
  126. # Pass actual instance, to have path arguments resolvedi
  127. # against it instead of Gooey's CWD.
  128. cmdkwargs['dataset'] = dlapi.Dataset(cmdkwargs['dataset'])
  129. except Exception as e:
  130. ce = CapturedException(e)
  131. self.execution_failed.emit(
  132. thread_id,
  133. cmdname,
  134. cmdkwargs,
  135. exec_params,
  136. ce
  137. )
  138. return
  139. gathered_results = []
  140. last_report_ts = time()
  141. try:
  142. for res in cmd(**cmdkwargs):
  143. t = time()
  144. res.update(res_override)
  145. gathered_results.append(res)
  146. if self._kaboom:
  147. raise InterruptedError()
  148. if (t - last_report_ts) > preferred_result_interval:
  149. self.results_received.emit(cls, gathered_results)
  150. gathered_results = []
  151. last_report_ts = t
  152. except Exception as e:
  153. if gathered_results:
  154. self.results_received.emit(cls, gathered_results)
  155. ce = CapturedException(e)
  156. self.execution_failed.emit(
  157. thread_id,
  158. cmdname,
  159. cmdkwargs,
  160. exec_params,
  161. ce
  162. )
  163. else:
  164. if gathered_results:
  165. self.results_received.emit(cls, gathered_results)
  166. self.execution_finished.emit(
  167. thread_id,
  168. cmdname,
  169. cmdkwargs,
  170. exec_params,
  171. )
  172. def _enable_activity_widget(
  173. self, thread_id: str, cmdname: str, cmdkwargs: dict,
  174. exec_params: dict):
  175. # thread_id, cmdname, cmdargs/kwargs, exec_params
  176. aw = self._activity_widget
  177. aw.setIcon(gooey_resources.get_best_icon('kaboom'))
  178. aw.setText(f" {cmdname}")
  179. aw.setToolButtonStyle(Qt.ToolButtonTextBesideIcon)
  180. aw.show()
  181. def _disable_activity_widget(
  182. self, thread_id: str, cmdname: str, cmdkwargs: dict,
  183. exec_params: dict, exc: CapturedException = None):
  184. self._kaboom = False
  185. # thread_id, cmdname, cmdargs/kwargs, exec_params
  186. aw = self._activity_widget
  187. aw.hide()
  188. def _stop_thread(self):
  189. self._kaboom = True
  190. @property
  191. def activity_widget(self):
  192. return self._activity_widget
  193. @property
  194. def n_running(self):
  195. return len([f for f in self._futures if f.running()])