123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- from concurrent.futures import ThreadPoolExecutor
- import threading
- from time import time
- from types import MappingProxyType
- from typing import (
- Dict,
- )
- from PySide6.QtCore import (
- QObject,
- Signal,
- Slot,
- )
- from PySide6.QtWidgets import (
- QToolButton,
- )
- from datalad.interface.base import Interface
- from datalad.support.exceptions import CapturedException
- from datalad.utils import get_wrapped_class
- # lazy import
- dlapi = None
- class GooeyDataladCmdExec(QObject):
- """Non-blocking execution of DataLad API commands
- and Qt-signal result reporting
- """
- # thread_id, cmdname, cmdargs/kwargs, exec_params
- execution_started = Signal(str, str, MappingProxyType, MappingProxyType)
- execution_finished = Signal(str, str, MappingProxyType, MappingProxyType)
- # thread_id, cmdname, cmdargs/kwargs, exec_params, CapturedException
- execution_failed = Signal(str, str, MappingProxyType, MappingProxyType, CapturedException)
- results_received = Signal(Interface, list)
- def __init__(self):
- super().__init__()
- aw = QToolButton()
- aw.setAutoRaise(True)
- aw.clicked.connect(self._stop_thread)
- aw.hide()
- self._activity_widget = aw
- self.execution_started.connect(self._enable_activity_widget)
- self.execution_finished.connect(self._disable_activity_widget)
- self.execution_failed.connect(self._disable_activity_widget)
- # flag whether a running thread should stop ASAP
- self._kaboom = False
- self._threadpool = ThreadPoolExecutor(
- max_workers=1,
- thread_name_prefix='gooey_datalad_cmdexec',
- # some callable to start at each thread execution
- #initializer=self.
- #initargs=
- )
- self._futures = set()
- # connect maintenance slot to give us an accurate
- # assessment of ongoing commands
- self.execution_finished.connect(self._update_futures)
- self.execution_failed.connect(self._update_futures)
- def _update_futures(self):
- self._futures = set(f for f in self._futures if f.running())
- @Slot(str, dict)
- def execute(self, cmd: str,
- kwargs: MappingProxyType or None = None,
- exec_params: MappingProxyType or None = None):
- if kwargs is None:
- kwargs = dict()
- if exec_params is None:
- exec_params = dict()
- global dlapi
- if dlapi is None:
- from datalad import api as dl
- dlapi = dl
- # right now, we have no use for the returned future, because result
- # communication and thread finishing are handled by emitting Qt signals
- self._futures.add(self._threadpool.submit(
- self._cmdexec_thread,
- cmd,
- kwargs,
- exec_params,
- ))
- def _cmdexec_thread(
- self, cmdname: str,
- cmdkwargs: MappingProxyType,
- exec_params: MappingProxyType):
- """The code is executed in a worker thread"""
- # we need to amend the record below, make a mutable version
- cmdkwargs = cmdkwargs.copy()
- print('EXECINTHREAD', cmdname, cmdkwargs, exec_params)
- preferred_result_interval = exec_params.get(
- 'preferred_result_interval', 1.0)
- res_override = exec_params.get(
- 'result_override', {})
- # get_ident() is an int, but in the future we might want to move
- # to PY3.8+ native thread IDs, so let's go with a string identifier
- # right away
- thread_id = str(threading.get_ident())
- # get functor to execute, resolve name against full API
- try:
- cmd = getattr(dlapi, cmdname)
- cls = get_wrapped_class(cmd)
- except Exception as e:
- self.execution_failed.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- CapturedException(e),
- )
- return
- try:
- # the following is trivial, but we wrap it nevertheless to prevent
- # a silent crash of the worker thread
- self.execution_started.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- )
- # enforce return_type='generator' to get the most responsive
- # any command could be
- cmdkwargs['return_type'] = 'generator'
- # Unless explicitly specified, force result records instead of the
- # command's default transformation which might give Dataset
- # instances for example.
- if 'result_xfm' not in cmdkwargs:
- cmdkwargs['result_xfm'] = None
- if 'dataset' in cmdkwargs:
- # Pass actual instance, to have path arguments resolvedi
- # against it instead of Gooey's CWD.
- cmdkwargs['dataset'] = dlapi.Dataset(cmdkwargs['dataset'])
- except Exception as e:
- ce = CapturedException(e)
- self.execution_failed.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- ce
- )
- return
- gathered_results = []
- last_report_ts = time()
- try:
- for res in cmd(**cmdkwargs):
- t = time()
- res.update(res_override)
- gathered_results.append(res)
- if self._kaboom:
- raise InterruptedError()
- if (t - last_report_ts) > preferred_result_interval:
- self.results_received.emit(cls, gathered_results)
- gathered_results = []
- last_report_ts = t
- except Exception as e:
- if gathered_results:
- self.results_received.emit(cls, gathered_results)
- ce = CapturedException(e)
- self.execution_failed.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- ce
- )
- else:
- if gathered_results:
- self.results_received.emit(cls, gathered_results)
- self.execution_finished.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- )
- def _enable_activity_widget(
- self, thread_id: str, cmdname: str, cmdkwargs: dict,
- exec_params: dict):
- # thread_id, cmdname, cmdargs/kwargs, exec_params
- aw = self._activity_widget
- aw.setText(f"KABOOM {cmdname}")
- aw.show()
- def _disable_activity_widget(
- self, thread_id: str, cmdname: str, cmdkwargs: dict,
- exec_params: dict, exc: CapturedException = None):
- self._kaboom = False
- # thread_id, cmdname, cmdargs/kwargs, exec_params
- aw = self._activity_widget
- aw.hide()
- def _stop_thread(self):
- self._kaboom = True
- @property
- def activity_widget(self):
- return self._activity_widget
- @property
- def n_running(self):
- return len([f for f in self._futures if f.running()])
|