Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

bci.py 24 KB


  1. import datetime
  2. import glob
  3. import logging
  4. import multiprocessing as mp
  5. import os
  6. import pathlib
  7. import re
  8. import time
  9. from datetime import datetime as dt
  10. from multiprocessing import Value
  11. from timeit import default_timer
  12. from shutil import copyfile
  13. import yaml
  14. import munch
  15. import numpy as np
  16. from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtSlot
  17. import aux
  18. from aux import log
  19. from paradigms import colorSpeller, exploration, question, trainingColorSpeller, feedback
  20. import subprocess
  21. '''
  22. recording_type:
  23. 0 - IDLE
  24. 1 - BASELINE
  25. 2 - STIMULUS
  26. 3 - RESPONSE
  27. '''
  28. class Bci(QObject):
  29. finished_block = pyqtSignal()
  30. history = pyqtSignal(str)
  31. def init(self, params, recording_status, decoder_decision, parent_conn, parent_conn2, parent_conn3, block_phase, audio_feedback_run, audio_fb_target): # __init__ may interfere with qthread
  32. # self.data_obj = data_obj
  33. log.debug('bci instance created...')
  34. self.params = params
  35. self.recording_status = recording_status # shared ctype int variable with data process
  36. self.decoder_decision = decoder_decision # shared ctype int variable with data process
  37. self.block_phase = block_phase # shared ctype int variable with data process
  38. self.audio_feedback_run = audio_feedback_run # shared ctype int variable with data process
  39. self.audio_fb_target = audio_fb_target
  40. self.parent_conn = parent_conn
  41. self.parent_conn2 = parent_conn2
  42. self.parent_conn3 = parent_conn3 # to communicate with speller matrix
  43. self.set_block_number()
  44. # self.fh_log_info = open(self.params.file_handling.filename_log_info, 'a') # to get a file-handle for later use
  45. # self.fh_log_debug = open(self.params.file_handling.filename_log_debug, 'a') # to get a file-handle for later use
  46. # self.fh_log_info.close()
  47. # self.fh_log_debug.close()
  48. # self.trial_number = 0
  49. self.reset_vars()
  50. self.decoder_decision_history = []
  51. self.decoder_decision_loop = -1 # pass over the decision from classifier process to bci process in recording_loop
  52. self.decision_set = False
  53. self.responses = ['yes', 'no']
  54. self.cfd = [] # store ConditionalFreqDist accross different runs
  55. return None
  56. # def emulate_bci(self, session_id=7):
  57. # data_tot = io.loadmat('/kiap/data/tom/model/trainData_rates2.mat')['trainData']
  58. # cur_data = data_tot[session_id:session_id + 1]
  59. # plt.figure(2)
  60. # plt.clf()
  61. # log.debug('EMULATE_BCI STARTED')
  62. # for jj in range(600, 2000, 20):
  63. # self.clf1.get_class(cur_data[0, 0][jj - 600:jj], jj)
  64. def reset_vars(self):
  65. self.trial_number = 0
  66. self.tp_tot = np.zeros(self.params.classifier.n_classes)
  67. self.fp_tot = np.zeros(self.params.classifier.n_classes)
  68. self.tpr = np.zeros(self.params.classifier.n_classes)
  69. self.fpr = np.zeros(self.params.classifier.n_classes)
  70. return None
  71. def set_block_number(self):
  72. block_number = 0
  73. if os.path.isfile(self.params.file_handling.filename_log_info):
  74. for line in reversed(list(open(self.params.file_handling.filename_log_info))):
  75. # with open(self.params.file_handling.filename_log, 'r') as fh:
  76. # for line in fh:
  77. if line.rfind('session:')>-1:
  78. block_number = int(re.findall('\d+', line)[-1]) # get last int from line
  79. break
  80. self.block_number = block_number # 0 if no session found in file
  81. return None
  82. def __write_to_log_info(self, msg, ts):
  83. # log.info(f'info log: {self.fh_log_info.closed}')
  84. # if self.fh_log_info.closed:
  85. with open(self.params.file_handling.filename_log_info, 'a') as self.fh_log_info:
  86. # self.fh_log_info.write('{}\t{}\n'.format(dt.now(), msg))
  87. self.fh_log_info.write('{}\t{}\n'.format(ts, msg))
  88. return None
  89. def __write_to_log_debug(self, msg, ts):
  90. # log.info(f'error log: {self.fh_log_debug.closed}')
  91. # if self.fh_log_debug.closed:
  92. with open(self.params.file_handling.filename_log_debug, 'a') as self.fh_log_debug:
  93. self.fh_log_debug.write('{}\t{}\n'.format(dt.now(), msg))
  94. return None
  95. def write_log(self, msg, targets='terminal', log_type='debug', ts=''):
  96. msg = f'{self.params.speller.type} - {msg} - {self.speller.get_current_state()}'
  97. if ts == '':
  98. ts = np.datetime64(dt.now())
  99. if targets == 'terminal' or targets == 'both':
  100. if log_type == 'info':
  101. log.info(msg)
  102. self.__write_to_log_info(msg, ts)
  103. self.__write_to_log_debug(msg, ts)
  104. elif log_type == 'debug':
  105. log.debug(msg)
  106. self.__write_to_log_debug(msg, ts)
  107. return
  108. # @pyqtSlot()
  109. def start_session(self):
  110. '''the main bci loop'''
  111. log.info('starting session')
  112. self.decoder_decision.value = 0
  113. self.speller_stim_number = 0
  114. if not self.params.file_handling.save_data:
  115. log.warning('No data will be saved. Check save_data flag !')
  116. else:
  117. # log.warning('Existing data in current date folder will be appended!')
  118. # aux.setfileattr(self.params) # add timestamps to files to make them unique
  119. # save current config.yaml and paradigm.yaml
  120. src_conf = os.path.abspath('config.yaml')
  121. dst_conf = self.params.file_handling.filename_config
  122. src_paradig = os.path.abspath('paradigm.yaml')
  123. dst_paradig = self.params.file_handling.filename_paradigm
  124. # create directories if necessary
  125. targetdir = os.path.split(dst_conf)[0]
  126. try:
  127. if not os.path.exists(targetdir):
  128. os.makedirs(targetdir)
  129. except:
  130. print("Permissions issue", "Can not create the directory " + targetdir + ". Please check your "
  131. "permissions.")
  132. log.warning("Permissions issue", "Can not create the directory " + targetdir +
  133. ". Please check your permissions.")
  134. return None
  135. try:
  136. res_conf = copyfile(src_conf, dst_conf)
  137. res_paradig = copyfile(src_paradig, dst_paradig)
  138. # also dump current config – as there may be changes
  139. config_dump_stream = open(self.params.file_handling.filename_config_dump, 'w')
  140. yaml.dump(self.params, config_dump_stream, sort_keys=False, default_flow_style=None)
  141. except Exception as e:
  142. log.error(e)
  143. log.error('Failed to save config and paradigm YAML files')
  144. self.stop_session('Aborted Session')
  145. return None
  146. log.info('Currently used config.yaml file has been saved as: ' + os.path.split(res_conf)[1])
  147. log.info('Currently used paradigm.yaml file has been saved as: ' + os.path.split(res_paradig)[1])
  148. # save git patch
  149. try:
  150. f = open(self.params.file_handling.filename_git_patch, "w")
  151. subprocess.call(['git', 'diff', '--binary', 'HEAD'], stdout=f)
  152. except Exception as e:
  153. log.error(e)
  154. log.error('Failed to save git patch')
  155. if self.params.classifier.online and \
  156. self.params.classifier.thr_window > (self.params.recording.timing.t_response/self.params.daq.spike_rates.loop_interval*1000):
  157. log.error('thr_window too large. No class will be identified')
  158. self.stop_session('Aborted Session')
  159. return None
  160. # check if there is mismatch with # of features
  161. try:
  162. if self.params.speller.type == 'exploration':
  163. # log.info('EXPLORATION')
  164. self.speller = exploration.Exploration(params=self.params)
  165. self.speller.name = 'exploration'
  166. elif self.params.speller.type == 'training_color':
  167. self.speller = trainingColorSpeller.TrainingColorSpeller(pyttsx_rate = self.params.speller.pyttsx_rate, params=self.params)
  168. self.speller.name = 'training_color'
  169. elif self.params.speller.type == 'question':
  170. self.speller = question.Question(params=self.params)
  171. self.speller.name = 'question'
  172. elif self.params.speller.type == 'feedback':
  173. self.speller = feedback.Feedback(self.audio_fb_target, params=self.params)
  174. self.speller.name = 'feedback'
  175. elif self.params.speller.type == 'color':
  176. # save user corpus
  177. src_corpus = os.path.join(self.params.paradigms.color.corpora_path, self.paradigms.color.speller_user_corpus)
  178. dst_corpus = self.params.file_handling.filename_corpus
  179. try:
  180. res_corpus = copyfile(src_corpus, dst_corpus)
  181. except Exception as e:
  182. log.error(e)
  183. log.error('Failed to save user corpus')
  184. log.info('Currently used speller_user.txt file has been saved as: ' + os.path.split(res_corpus)[1])
  185. if self.cfd == []:
  186. self.speller = colorSpeller.ColorSpeller(pyttsx_rate = self.params.speller.pyttsx_rate, params=self.params)
  187. self.speller.name = 'color'
  188. self.cfd = self.speller.general_cfd
  189. else:
  190. self.speller = colorSpeller.ColorSpeller(self.cfd, params=self.params) # use stored one to save time
  191. self.speller.engine.setProperty('rate',self.params.speller.pyttsx_rate)
  192. self.speller.name = 'color'
  193. else:
  194. log.error('No correct speller selected')
  195. self.stop_session('Aborted Session')
  196. return None
  197. except Exception as e:
  198. log.error(e)
  199. log.error('Failed to instantiate speller object')
  200. self.stop_session('Aborted Session')
  201. return None
  202. log.info(f'speller mode: {self.speller.mode}')
  203. # if self.speller.mode == 'Training' or self.speller.mode == 'Screening': #quick fix for now
  204. # os.system("sed -i 's/online: True/online: False/g' config.yaml")
  205. # log.warning('Online decoding is switched off')
  206. # else:
  207. # os.system("sed -i 's/online: False/online: True/g' config.yaml")
  208. # log.warning('Online decoding is switched on')
  209. # log.warning(f'# channels excluded for online classification: {len(self.params.classifier.exclude_channels)} !!')
  210. tic0 = time.time()
  211. self.block_number += 1
  212. self.fh_log_info = open(self.params.file_handling.filename_log_info, 'a')
  213. self.fh_log_debug = open(self.params.file_handling.filename_log_debug, 'a')
  214. self.recording_status.value = 1
  215. self.send_trigger('Block, start')
  216. # self.write_log(f'Start {self.params.speller.type}, session:{self.block_number}', 'both', 'info')
  217. # self.write_log('Recording', 'terminal', 'info')
  218. try:
  219. self.speller.present_start(
  220. audio=self.params.speller.audio) # this may have to be a different process or thread
  221. except Exception as e:
  222. log.error(e)
  223. log.debug('bci thread loop')
  224. speller_done = False
  225. while not speller_done:
  226. self.trial_number += 1
  227. self.speller_stim_number += 1
  228. elapsed_time = 0
  229. cur_state = self.speller.get_current_state()
  230. self.decoder_decision.value = -1
  231. self.decoder_decision_loop = -1
  232. self.decision_set = False
  233. log.info(f'Trial: {self.trial_number}, Speller stim number: {self.speller_stim_number}')
  234. if self.params.session.flags.bl: # 1. BASELINE
  235. self.block_phase.value = 0 # baseline started
  236. self.send_trigger('baseline, start')
  237. # tic = dt.now()
  238. # self.write_log('Baseline start', 'both', 'info')
  239. if self.trial_number == 1:
  240. t_baseline = self.params.recording.timing.t_baseline_1
  241. else:
  242. t_baseline = self.params.recording.timing.t_baseline_all
  243. if self.params.session.flags.bl_rand:
  244. t_baseline += self.params.recording.timing.t_baseline_rand * np.random.random()
  245. log.info(f"Trial: {self.trial_number}, Baseline period starts: {t_baseline}s")
  246. exit_code = self.recording_loop(dt.now(), elapsed_time, t_baseline)
  247. if exit_code == 1: # recording stopped by user
  248. return
  249. self.send_trigger('baseline, stop')
  250. elapsed_time = 0
  251. if self.params.session.flags.stimulus: # 2. STIMULUS PRESENTATION
  252. log.info(f"Trial: {self.trial_number}, Stimulus period starts")
  253. self.block_phase.value = 1 # stimulus presentation started
  254. self.send_trigger('stimulus, start')
  255. # self.write_log(f'Stimulus start', 'both', 'info')
  256. # first display current state and the present stimulus, otherwise wrong state will be displayed
  257. log.warning(self.speller.mode)
  258. if (self.speller.name=='question') and (self.speller.mode=='Free'): # allow formulation of free questions
  259. os.system("aplay /kiap/data/speller/Audio/short_beep.wav -q")
  260. input('\nPlease ask question and press enter to continue\n')
  261. else:
  262. try:
  263. self.speller.present_stimulus(audio=self.params.speller.audio) # this may have to be a different process or thread
  264. except Exception as e:
  265. log.error(e)
  266. self.send_trigger('stimulus, stop')
  267. if self.speller.name == 'exploration':
  268. os.system("aplay /kiap/data/speller/feedback/beep.wav -q")
  269. if self.params.feedback.feedback_tone:
  270. self.audio_feedback_run.value = 1
  271. time.sleep(self.params.recording.timing.t_after_stimulus)
  272. self.send_trigger('response, start')
  273. tic = dt.now()
  274. self.block_phase.value = 2 # neural response started
  275. exit_code = self.recording_loop(tic, elapsed_time, self.params.recording.timing.t_response) # 3. PATIENT NEURAL RESPONSE
  276. if exit_code > 0:
  277. return
  278. self.block_phase.value = 3 # neural response stop
  279. self.audio_feedback_run.value = 0
  280. # self.parent_conn.send(1) # send signal, only when done execution will continue
  281. # ts = self.parent_conn.recv()
  282. # self.decoder_decision_history.append(self.params.decision(self.decoder_decision.value).value)
  283. log.warning(f'decoder decision after loop:{self.decoder_decision_loop}')
  284. self.decoder_decision_history.append(aux.decision(self.decoder_decision_loop).value) # decision is copied in recording loop
  285. self.get_stats()
  286. self.send_trigger('response, stop')
  287. # self.write_log(f'Response stop', 'both', 'info', dt.now())
  288. if self.speller.name == 'exploration':
  289. os.system("aplay /kiap/data/speller/Audio/exploration/danke.wav -q")
  290. # self.recording_type.value = 0
  291. # self.write_log(f'exit code: {exit_code} - Decoder decision: {self.params.decision(self.decoder_decision_history[-1]).name}')
  292. if self.params.file_handling.save_data:
  293. self.write_log(f'Decoder decision: {aux.decision(self.decoder_decision_history[-1]).name}', log_type='info')
  294. log.info(f'Decoder decision is: {aux.decision(self.decoder_decision_history[-1]).name}')
  295. self.send_trigger(f'Decoder decision is: {aux.decision(self.decoder_decision_history[-1]).name}')
  296. if exit_code == 1: # recording stopped by user
  297. return
  298. self.parent_conn2.send(self.decoder_decision_history[-1]) # to inform real-time plot
  299. log.debug('VALUE SEND for plotting')
  300. try:
  301. speller_done = self.speller.process_result(decision=self.decoder_decision_history[-1], audio=self.params.speller.audio_result_fb) # 0: yes, 1:no, 2:baseline
  302. except Exception as e:
  303. log.error(e)
  304. # self.info(f'Current state: {self.speller.get_current_state()}')
  305. if self.params.speller.type == 'training_color':
  306. self.history.emit(f'{self.trial_number}. {cur_state[1]} - {self.speller.current_string} | TP: {self.tp_tot}, FP: {self.fp_tot}')
  307. elif hasattr(self.speller, 'current_response'):
  308. self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]} | TP: {self.tp_tot}, FP: {self.fp_tot}')
  309. self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]}, decision: {aux.decision(self.decoder_decision_history[-1]).name}')
  310. else:
  311. self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]}, decision: {aux.decision(self.decoder_decision_history[-1]).name}')
  312. if self.params.speller.type == 'color' and self.params.speller.speller_matrix:
  313. self.parent_conn3.send([self.speller.current_selection, self.speller.current_string, self.speller.get_vocabulary()])
  314. log.info(f'TP: {self.tp_tot}, TPR: {self.tpr}')
  315. log.info(f'FP: {self.fp_tot}, FPR: {self.fpr}')
  316. time.sleep(0.01)
  317. if hasattr(self.speller,'current_string'): # spell whole sentence
  318. self.speller._say(self.speller.current_string.lower(), self.speller.trigger[0], self.speller.trigger[1])
  319. self.speller.close()
  320. self.stop_session('Normal Stop')
  321. # log.debug('bci elapsed time: {}'.format(time.time()-tic))
  322. log.debug('bci elapsed time: {}'.format(dt.now()-tic))
  323. return None
  324. def send_trigger(self, msg=''):
  325. # log.error(self.speller.get_current_state()[0])
  326. if self.params.speller.type == 'question':
  327. if 'yes' in self.speller.get_current_state()[0].lower():
  328. msg = 'yes, ' + msg
  329. if 'no' in self.speller.get_current_state()[0].lower():
  330. msg = 'no, ' + msg
  331. elif self.params.speller.type == 'training_color':
  332. if 'yes' in self.speller.get_current_state()[1].lower():
  333. msg = 'yes, ' + msg
  334. if 'no' in self.speller.get_current_state()[1].lower():
  335. msg = 'no, ' + msg
  336. elif self.params.speller.type == 'exploration':
  337. msg = self.speller.current_stimulus + ', ' + msg
  338. elif self.params.speller.type == 'feedback':
  339. if 'down' in self.speller.current_stimulus:
  340. msg = 'down, ' + msg
  341. elif 'up' in self.speller.current_stimulus:
  342. msg = 'up, ' + msg
  343. else: # Not sure why only 'up' and 'down' should be logged.
  344. msg = self.speller.current_stimulus + ', ' + msg
  345. trigger = f'{self.params.speller.type}, {self.speller.mode}, {msg}'
  346. self.parent_conn.send(trigger)
  347. log.info(f'send to parent_conn pipe: {trigger}')
  348. return None
  349. def recording_loop(self, tic, elapsed_time, rec_duration):
  350. log.info(f'recording status: {self.recording_status.value}')
  351. while (self.recording_status.value) and (elapsed_time <= rec_duration):
  352. # log.info(f'decoder decision inside loop1: {self.decoder_decision_loop}')
  353. if self.decoder_decision.value >= 0 and not self.decision_set:
  354. self.decoder_decision_loop = np.copy(self.decoder_decision.value)
  355. self.decision_set = True
  356. # log.info(f'decoder decision inside loop2: {self.decoder_decision_loop}')
  357. if self.params.classifier.break_loop: # break loop if decision available
  358. break
  359. else:
  360. time.sleep(self.params.recording.timing.bci_loop_interval)
  361. elapsed_time = (dt.now() - tic).total_seconds()
  362. else:
  363. log.info(f'time elapsed. decision: {self.decoder_decision_loop}')
  364. if self.recording_status.value == 0: # if user pressed stop
  365. self.stop_session()
  366. return 1
  367. return 0
  368. def stop_session(self, msg='Forced Stop'):
  369. self.send_trigger('Block, stop')
  370. # time.sleep(0.50)
  371. self.recording_status.value = 0
  372. self.audio_feedback_run.value = 0
  373. if not self.fh_log_info.closed:
  374. self.write_log('{} {}'.format(msg, self.params.speller.type), 'both', 'info')
  375. self.fh_log_info.close()
  376. if not self.fh_log_debug.closed:
  377. self.write_log('{} {}'.format(msg, self.params.speller.type), 'both', 'debug')
  378. self.fh_log_debug.close()
  379. # return self.get_data()
  380. log.info('Session stopped')
  381. self.finished_block.emit()
  382. self.reset_vars()
  383. # time.sleep(1) # wait to complete writing buffer to file in data.py
  384. # if self.params.file_handling.save_data:
  385. # aux.add_timestamps(self.params) # add timestamps to files to make them unique
  386. return
  387. def get_data(self):
  388. res = self.data_obj.read_buffer()
  389. # log.debug(res)
  390. return res
  391. # def get_params(self):
  392. # return self.params
  393. def load_config(self):
  394. try:
  395. self.params = aux.load_config()
  396. except Exception as e:
  397. log.warning(e)
  398. # self.stop_session('Invalid config')
  399. return 1
  400. return 0
  401. def get_stats(self):
  402. '''compute true positives, false positives etc'''
  403. if hasattr(self.speller, 'current_response'):
  404. log.warning(f'history: {self.decoder_decision_history}')
  405. try:
  406. for cl_id in range(self.params.classifier.n_triggers):
  407. if self.decoder_decision_history[-1] == cl_id:
  408. if self.speller.current_response == self.responses[cl_id]:
  409. self.tp_tot[cl_id] = self.tp_tot[cl_id] + 1
  410. else:
  411. self.fp_tot[cl_id] = self.fp_tot[cl_id] + 1
  412. if np.sum(np.array(self.decoder_decision_history) == cl_id) >0:
  413. self.tpr[cl_id] = self.tp_tot[cl_id] / np.sum(np.array(self.decoder_decision_history) == cl_id)
  414. self.fpr[cl_id] = self.fp_tot[cl_id] / (self.fp_tot[cl_id] + (len(self.decoder_decision_history) - np.sum(np.array(self.decoder_decision_history) == cl_id)))
  415. except Exception as e:
  416. log.error(e)
  417. log.debug(f'fp: {self.fp_tot}')
  418. return None