123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- from concurrent.futures import ThreadPoolExecutor
- import threading
- from time import time
- from typing import (
- Dict,
- )
- from PySide6.QtCore import (
- QObject,
- Signal,
- Slot,
- )
- from datalad.interface.base import Interface
- from datalad.support.exceptions import CapturedException
- from datalad.utils import get_wrapped_class
- # lazy import
- dlapi = None
- class GooeyDataladCmdExec(QObject):
- """Non-blocking execution of DataLad API commands
- and Qt-signal result reporting
- """
- # thread_id, cmdname, cmdargs/kwargs
- execution_started = Signal(str, str, dict, dict)
- execution_finished = Signal(str, str, dict, dict)
- # thread_id, cmdname, cmdargs/kwargs, CapturedException
- execution_failed = Signal(str, str, dict, CapturedException)
- results_received = Signal(Interface, list)
- def __init__(self):
- super().__init__()
- self._threadpool = ThreadPoolExecutor(
- max_workers=1,
- thread_name_prefix='gooey_datalad_cmdexec',
- # some callable to start at each thread execution
- #initializer=self.
- #initargs=
- )
- self._futures = set()
- # connect maintenance slot to give us an accurate
- # assessment of ongoing commands
- self.execution_finished.connect(self._update_futures)
- self.execution_failed.connect(self._update_futures)
- def _update_futures(self):
- self._futures = set(f for f in self._futures if f.running())
- @Slot(str, dict)
- def execute(self, cmd: str,
- kwargs: Dict or None = None,
- exec_params: Dict or None = None):
- if kwargs is None:
- kwargs = dict()
- if exec_params is None:
- exec_params = dict()
- global dlapi
- if dlapi is None:
- from datalad import api as dl
- dlapi = dl
- # right now, we have no use for the returned future, because result
- # communication and thread finishing are handled by emitting Qt signals
- self._futures.add(self._threadpool.submit(
- self._cmdexec_thread,
- cmd,
- kwargs,
- exec_params,
- ))
- def _cmdexec_thread(self, cmdname: str, cmdkwargs: Dict, exec_params: Dict):
- """The code is executed in a worker thread"""
- print('EXECINTHREAD', cmdname, cmdkwargs, exec_params)
- preferred_result_interval = exec_params.get(
- 'preferred_result_interval', 1.0)
- res_override = exec_params.get(
- 'result_override', {})
- # get_ident() is an int, but in the future we might want to move
- # to PY3.8+ native thread IDs, so let's go with a string identifier
- # right away
- thread_id = str(threading.get_ident())
- # get functor to execute, resolve name against full API
- try:
- cmd = getattr(dlapi, cmdname)
- cls = get_wrapped_class(cmd)
- except Exception as e:
- self.execution_failed.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- CapturedException(e),
- )
- return
- self.execution_started.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- )
- # enforce return_type='generator' to get the most responsive
- # any command could be
- cmdkwargs['return_type'] = 'generator'
- # Unless explicitly specified, force result records instead of the
- # command's default transformation which might give Dataset instances
- # for example.
- if 'result_xfm' not in cmdkwargs:
- cmdkwargs['result_xfm'] = None
- if 'dataset' in cmdkwargs:
- # Pass actual instance, to have path arguments resolved against it
- # instead of Gooey's CWD.
- cmdkwargs['dataset'] = dlapi.Dataset(cmdkwargs['dataset'])
- gathered_results = []
- last_report_ts = time()
- try:
- for res in cmd(**cmdkwargs):
- t = time()
- res.update(res_override)
- gathered_results.append(res)
- if (t - last_report_ts) > preferred_result_interval:
- self.results_received.emit(cls, gathered_results)
- gathered_results = []
- last_report_ts = t
- except Exception as e:
- if gathered_results:
- self.results_received.emit(cls, gathered_results)
- ce = CapturedException(e)
- self.execution_failed.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- ce
- )
- else:
- if gathered_results:
- self.results_received.emit(cls, gathered_results)
- self.execution_finished.emit(
- thread_id,
- cmdname,
- cmdkwargs,
- exec_params,
- )
- @property
- def n_running(self):
- return len([f for f in self._futures if f.running()])
|