|
- import datetime
- import glob
- import logging
- import multiprocessing as mp
- import os
- import pathlib
- import re
- import time
- from datetime import datetime as dt
- from multiprocessing import Value
- from timeit import default_timer
- from shutil import copyfile
- import yaml
- import munch
- import numpy as np
- from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtSlot
- import aux
- from aux import log
- from paradigms import colorSpeller, exploration, question, trainingColorSpeller, feedback
- import subprocess
- '''
- recording_type:
- 0 - IDLE
- 1 - BASELINE
- 2 - STIMULUS
- 3 - RESPONSE
- '''
- class Bci(QObject):
- finished_block = pyqtSignal()
- history = pyqtSignal(str)
- def init(self, params, recording_status, decoder_decision, parent_conn, parent_conn2, parent_conn3, block_phase, audio_feedback_run, audio_fb_target): # __init__ may interfere with qthread
- # self.data_obj = data_obj
-
- log.debug('bci instance created...')
- self.params = params
- self.recording_status = recording_status # shared ctype int variable with data process
- self.decoder_decision = decoder_decision # shared ctype int variable with data process
- self.block_phase = block_phase # shared ctype int variable with data process
- self.audio_feedback_run = audio_feedback_run # shared ctype int variable with data process
- self.audio_fb_target = audio_fb_target
- self.parent_conn = parent_conn
- self.parent_conn2 = parent_conn2
- self.parent_conn3 = parent_conn3 # to communicate with speller matrix
- self.set_block_number()
- # self.fh_log_info = open(self.params.file_handling.filename_log_info, 'a') # to get a file-handle for later use
- # self.fh_log_debug = open(self.params.file_handling.filename_log_debug, 'a') # to get a file-handle for later use
- # self.fh_log_info.close()
- # self.fh_log_debug.close()
- # self.trial_number = 0
- self.reset_vars()
- self.decoder_decision_history = []
- self.decoder_decision_loop = -1 # pass over the decision from classifier process to bci process in recording_loop
- self.decision_set = False
- self.responses = ['yes', 'no']
- self.cfd = [] # store ConditionalFreqDist accross different runs
- return None
- # def emulate_bci(self, session_id=7):
- # data_tot = io.loadmat('/kiap/data/tom/model/trainData_rates2.mat')['trainData']
- # cur_data = data_tot[session_id:session_id + 1]
- # plt.figure(2)
- # plt.clf()
- # log.debug('EMULATE_BCI STARTED')
- # for jj in range(600, 2000, 20):
- # self.clf1.get_class(cur_data[0, 0][jj - 600:jj], jj)
- def reset_vars(self):
- self.trial_number = 0
- self.tp_tot = np.zeros(self.params.classifier.n_classes)
- self.fp_tot = np.zeros(self.params.classifier.n_classes)
- self.tpr = np.zeros(self.params.classifier.n_classes)
- self.fpr = np.zeros(self.params.classifier.n_classes)
-
- return None
- def set_block_number(self):
- block_number = 0
- if os.path.isfile(self.params.file_handling.filename_log_info):
- for line in reversed(list(open(self.params.file_handling.filename_log_info))):
- # with open(self.params.file_handling.filename_log, 'r') as fh:
- # for line in fh:
- if line.rfind('session:')>-1:
- block_number = int(re.findall('\d+', line)[-1]) # get last int from line
- break
- self.block_number = block_number # 0 if no session found in file
-
- return None
- def __write_to_log_info(self, msg, ts):
- # log.info(f'info log: {self.fh_log_info.closed}')
- # if self.fh_log_info.closed:
- with open(self.params.file_handling.filename_log_info, 'a') as self.fh_log_info:
- # self.fh_log_info.write('{}\t{}\n'.format(dt.now(), msg))
- self.fh_log_info.write('{}\t{}\n'.format(ts, msg))
- return None
- def __write_to_log_debug(self, msg, ts):
- # log.info(f'error log: {self.fh_log_debug.closed}')
- # if self.fh_log_debug.closed:
- with open(self.params.file_handling.filename_log_debug, 'a') as self.fh_log_debug:
- self.fh_log_debug.write('{}\t{}\n'.format(dt.now(), msg))
- return None
- def write_log(self, msg, targets='terminal', log_type='debug', ts=''):
- msg = f'{self.params.speller.type} - {msg} - {self.speller.get_current_state()}'
- if ts == '':
- ts = np.datetime64(dt.now())
- if targets == 'terminal' or targets == 'both':
- if log_type == 'info':
- log.info(msg)
- self.__write_to_log_info(msg, ts)
- self.__write_to_log_debug(msg, ts)
- elif log_type == 'debug':
- log.debug(msg)
- self.__write_to_log_debug(msg, ts)
- return
- # @pyqtSlot()
- def start_session(self):
- '''the main bci loop'''
- log.info('starting session')
- self.decoder_decision.value = 0
- self.speller_stim_number = 0
- if not self.params.file_handling.save_data:
- log.warning('No data will be saved. Check save_data flag !')
- else:
- # log.warning('Existing data in current date folder will be appended!')
- # aux.setfileattr(self.params) # add timestamps to files to make them unique
- # save current config.yaml and paradigm.yaml
- src_conf = os.path.abspath('config.yaml')
- dst_conf = self.params.file_handling.filename_config
- src_paradig = os.path.abspath('paradigm.yaml')
- dst_paradig = self.params.file_handling.filename_paradigm
- # create directories if necessary
- targetdir = os.path.split(dst_conf)[0]
- try:
- if not os.path.exists(targetdir):
- os.makedirs(targetdir)
- except:
- print("Permissions issue", "Can not create the directory " + targetdir + ". Please check your "
- "permissions.")
- log.warning("Permissions issue", "Can not create the directory " + targetdir +
- ". Please check your permissions.")
- return None
- try:
- res_conf = copyfile(src_conf, dst_conf)
- res_paradig = copyfile(src_paradig, dst_paradig)
-
- # also dump current config – as there may be changes
- config_dump_stream = open(self.params.file_handling.filename_config_dump, 'w')
- yaml.dump(self.params, config_dump_stream, sort_keys=False, default_flow_style=None)
-
- except Exception as e:
- log.error(e)
- log.error('Failed to save config and paradigm YAML files')
- self.stop_session('Aborted Session')
- return None
- log.info('Currently used config.yaml file has been saved as: ' + os.path.split(res_conf)[1])
- log.info('Currently used paradigm.yaml file has been saved as: ' + os.path.split(res_paradig)[1])
-
- # save git patch
- try:
- f = open(self.params.file_handling.filename_git_patch, "w")
- subprocess.call(['git', 'diff', '--binary', 'HEAD'], stdout=f)
-
- except Exception as e:
- log.error(e)
- log.error('Failed to save git patch')
-
- if self.params.classifier.online and \
- self.params.classifier.thr_window > (self.params.recording.timing.t_response/self.params.daq.spike_rates.loop_interval*1000):
- log.error('thr_window too large. No class will be identified')
- self.stop_session('Aborted Session')
- return None
- # check if there is mismatch with # of features
- try:
- if self.params.speller.type == 'exploration':
- # log.info('EXPLORATION')
- self.speller = exploration.Exploration(params=self.params)
- self.speller.name = 'exploration'
-
- elif self.params.speller.type == 'training_color':
- self.speller = trainingColorSpeller.TrainingColorSpeller(pyttsx_rate = self.params.speller.pyttsx_rate, params=self.params)
- self.speller.name = 'training_color'
-
- elif self.params.speller.type == 'question':
- self.speller = question.Question(params=self.params)
- self.speller.name = 'question'
-
- elif self.params.speller.type == 'feedback':
- self.speller = feedback.Feedback(self.audio_fb_target, params=self.params)
- self.speller.name = 'feedback'
-
- elif self.params.speller.type == 'color':
- # save user corpus
- src_corpus = os.path.join(self.params.paradigms.color.corpora_path, self.paradigms.color.speller_user_corpus)
- dst_corpus = self.params.file_handling.filename_corpus
-
- try:
- res_corpus = copyfile(src_corpus, dst_corpus)
-
- except Exception as e:
- log.error(e)
- log.error('Failed to save user corpus')
- log.info('Currently used speller_user.txt file has been saved as: ' + os.path.split(res_corpus)[1])
-
- if self.cfd == []:
- self.speller = colorSpeller.ColorSpeller(pyttsx_rate = self.params.speller.pyttsx_rate, params=self.params)
- self.speller.name = 'color'
- self.cfd = self.speller.general_cfd
- else:
- self.speller = colorSpeller.ColorSpeller(self.cfd, params=self.params) # use stored one to save time
- self.speller.engine.setProperty('rate',self.params.speller.pyttsx_rate)
- self.speller.name = 'color'
- else:
- log.error('No correct speller selected')
- self.stop_session('Aborted Session')
- return None
- except Exception as e:
- log.error(e)
- log.error('Failed to instantiate speller object')
- self.stop_session('Aborted Session')
- return None
- log.info(f'speller mode: {self.speller.mode}')
- # if self.speller.mode == 'Training' or self.speller.mode == 'Screening': #quick fix for now
- # os.system("sed -i 's/online: True/online: False/g' config.yaml")
- # log.warning('Online decoding is switched off')
- # else:
- # os.system("sed -i 's/online: False/online: True/g' config.yaml")
- # log.warning('Online decoding is switched on')
- # log.warning(f'# channels excluded for online classification: {len(self.params.classifier.exclude_channels)} !!')
- tic0 = time.time()
- self.block_number += 1
- self.fh_log_info = open(self.params.file_handling.filename_log_info, 'a')
- self.fh_log_debug = open(self.params.file_handling.filename_log_debug, 'a')
- self.recording_status.value = 1
- self.send_trigger('Block, start')
- # self.write_log(f'Start {self.params.speller.type}, session:{self.block_number}', 'both', 'info')
- # self.write_log('Recording', 'terminal', 'info')
- try:
- self.speller.present_start(
- audio=self.params.speller.audio) # this may have to be a different process or thread
- except Exception as e:
- log.error(e)
- log.debug('bci thread loop')
- speller_done = False
- while not speller_done:
- self.trial_number += 1
- self.speller_stim_number += 1
- elapsed_time = 0
- cur_state = self.speller.get_current_state()
- self.decoder_decision.value = -1
- self.decoder_decision_loop = -1
- self.decision_set = False
- log.info(f'Trial: {self.trial_number}, Speller stim number: {self.speller_stim_number}')
- if self.params.session.flags.bl: # 1. BASELINE
- self.block_phase.value = 0 # baseline started
- self.send_trigger('baseline, start')
- # tic = dt.now()
- # self.write_log('Baseline start', 'both', 'info')
- if self.trial_number == 1:
- t_baseline = self.params.recording.timing.t_baseline_1
- else:
- t_baseline = self.params.recording.timing.t_baseline_all
- if self.params.session.flags.bl_rand:
- t_baseline += self.params.recording.timing.t_baseline_rand * np.random.random()
- log.info(f"Trial: {self.trial_number}, Baseline period starts: {t_baseline}s")
- exit_code = self.recording_loop(dt.now(), elapsed_time, t_baseline)
- if exit_code == 1: # recording stopped by user
- return
- self.send_trigger('baseline, stop')
- elapsed_time = 0
- if self.params.session.flags.stimulus: # 2. STIMULUS PRESENTATION
- log.info(f"Trial: {self.trial_number}, Stimulus period starts")
-
- self.block_phase.value = 1 # stimulus presentation started
- self.send_trigger('stimulus, start')
- # self.write_log(f'Stimulus start', 'both', 'info')
- # first display current state and the present stimulus, otherwise wrong state will be displayed
- log.warning(self.speller.mode)
- if (self.speller.name=='question') and (self.speller.mode=='Free'): # allow formulation of free questions
- os.system("aplay /kiap/data/speller/Audio/short_beep.wav -q")
- input('\nPlease ask question and press enter to continue\n')
- else:
- try:
- self.speller.present_stimulus(audio=self.params.speller.audio) # this may have to be a different process or thread
- except Exception as e:
- log.error(e)
- self.send_trigger('stimulus, stop')
- if self.speller.name == 'exploration':
- os.system("aplay /kiap/data/speller/feedback/beep.wav -q")
- if self.params.feedback.feedback_tone:
- self.audio_feedback_run.value = 1
- time.sleep(self.params.recording.timing.t_after_stimulus)
- self.send_trigger('response, start')
- tic = dt.now()
- self.block_phase.value = 2 # neural response started
- exit_code = self.recording_loop(tic, elapsed_time, self.params.recording.timing.t_response) # 3. PATIENT NEURAL RESPONSE
- if exit_code > 0:
- return
- self.block_phase.value = 3 # neural response stop
- self.audio_feedback_run.value = 0
- # self.parent_conn.send(1) # send signal, only when done execution will continue
- # ts = self.parent_conn.recv()
- # self.decoder_decision_history.append(self.params.decision(self.decoder_decision.value).value)
- log.warning(f'decoder decision after loop:{self.decoder_decision_loop}')
- self.decoder_decision_history.append(aux.decision(self.decoder_decision_loop).value) # decision is copied in recording loop
- self.get_stats()
- self.send_trigger('response, stop')
- # self.write_log(f'Response stop', 'both', 'info', dt.now())
- if self.speller.name == 'exploration':
- os.system("aplay /kiap/data/speller/Audio/exploration/danke.wav -q")
- # self.recording_type.value = 0
-
- # self.write_log(f'exit code: {exit_code} - Decoder decision: {self.params.decision(self.decoder_decision_history[-1]).name}')
- if self.params.file_handling.save_data:
- self.write_log(f'Decoder decision: {aux.decision(self.decoder_decision_history[-1]).name}', log_type='info')
- log.info(f'Decoder decision is: {aux.decision(self.decoder_decision_history[-1]).name}')
- self.send_trigger(f'Decoder decision is: {aux.decision(self.decoder_decision_history[-1]).name}')
- if exit_code == 1: # recording stopped by user
- return
- self.parent_conn2.send(self.decoder_decision_history[-1]) # to inform real-time plot
- log.debug('VALUE SEND for plotting')
- try:
- speller_done = self.speller.process_result(decision=self.decoder_decision_history[-1], audio=self.params.speller.audio_result_fb) # 0: yes, 1:no, 2:baseline
- except Exception as e:
- log.error(e)
- # self.info(f'Current state: {self.speller.get_current_state()}')
- if self.params.speller.type == 'training_color':
- self.history.emit(f'{self.trial_number}. {cur_state[1]} - {self.speller.current_string} | TP: {self.tp_tot}, FP: {self.fp_tot}')
- elif hasattr(self.speller, 'current_response'):
- self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]} | TP: {self.tp_tot}, FP: {self.fp_tot}')
- self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]}, decision: {aux.decision(self.decoder_decision_history[-1]).name}')
- else:
- self.history.emit(f'{self.trial_number}. {cur_state[0]}, {cur_state[1]}, decision: {aux.decision(self.decoder_decision_history[-1]).name}')
- if self.params.speller.type == 'color' and self.params.speller.speller_matrix:
- self.parent_conn3.send([self.speller.current_selection, self.speller.current_string, self.speller.get_vocabulary()])
- log.info(f'TP: {self.tp_tot}, TPR: {self.tpr}')
- log.info(f'FP: {self.fp_tot}, FPR: {self.fpr}')
- time.sleep(0.01)
- if hasattr(self.speller,'current_string'): # spell whole sentence
- self.speller._say(self.speller.current_string.lower(), self.speller.trigger[0], self.speller.trigger[1])
- self.speller.close()
- self.stop_session('Normal Stop')
- # log.debug('bci elapsed time: {}'.format(time.time()-tic))
- log.debug('bci elapsed time: {}'.format(dt.now()-tic))
- return None
- def send_trigger(self, msg=''):
- # log.error(self.speller.get_current_state()[0])
- if self.params.speller.type == 'question':
- if 'yes' in self.speller.get_current_state()[0].lower():
- msg = 'yes, ' + msg
- if 'no' in self.speller.get_current_state()[0].lower():
- msg = 'no, ' + msg
- elif self.params.speller.type == 'training_color':
- if 'yes' in self.speller.get_current_state()[1].lower():
- msg = 'yes, ' + msg
- if 'no' in self.speller.get_current_state()[1].lower():
- msg = 'no, ' + msg
- elif self.params.speller.type == 'exploration':
- msg = self.speller.current_stimulus + ', ' + msg
- elif self.params.speller.type == 'feedback':
- if 'down' in self.speller.current_stimulus:
- msg = 'down, ' + msg
- elif 'up' in self.speller.current_stimulus:
- msg = 'up, ' + msg
- else: # Not sure why only 'up' and 'down' should be logged.
- msg = self.speller.current_stimulus + ', ' + msg
- trigger = f'{self.params.speller.type}, {self.speller.mode}, {msg}'
- self.parent_conn.send(trigger)
- log.info(f'send to parent_conn pipe: {trigger}')
- return None
- def recording_loop(self, tic, elapsed_time, rec_duration):
- log.info(f'recording status: {self.recording_status.value}')
- while (self.recording_status.value) and (elapsed_time <= rec_duration):
- # log.info(f'decoder decision inside loop1: {self.decoder_decision_loop}')
- if self.decoder_decision.value >= 0 and not self.decision_set:
- self.decoder_decision_loop = np.copy(self.decoder_decision.value)
- self.decision_set = True
- # log.info(f'decoder decision inside loop2: {self.decoder_decision_loop}')
- if self.params.classifier.break_loop: # break loop if decision available
- break
- else:
- time.sleep(self.params.recording.timing.bci_loop_interval)
- elapsed_time = (dt.now() - tic).total_seconds()
- else:
- log.info(f'time elapsed. decision: {self.decoder_decision_loop}')
- if self.recording_status.value == 0: # if user pressed stop
- self.stop_session()
- return 1
- return 0
- def stop_session(self, msg='Forced Stop'):
- self.send_trigger('Block, stop')
- # time.sleep(0.50)
- self.recording_status.value = 0
- self.audio_feedback_run.value = 0
- if not self.fh_log_info.closed:
- self.write_log('{} {}'.format(msg, self.params.speller.type), 'both', 'info')
- self.fh_log_info.close()
- if not self.fh_log_debug.closed:
- self.write_log('{} {}'.format(msg, self.params.speller.type), 'both', 'debug')
- self.fh_log_debug.close()
- # return self.get_data()
- log.info('Session stopped')
- self.finished_block.emit()
- self.reset_vars()
- # time.sleep(1) # wait to complete writing buffer to file in data.py
- # if self.params.file_handling.save_data:
- # aux.add_timestamps(self.params) # add timestamps to files to make them unique
- return
- def get_data(self):
- res = self.data_obj.read_buffer()
- # log.debug(res)
- return res
- # def get_params(self):
- # return self.params
- def load_config(self):
- try:
- self.params = aux.load_config()
- except Exception as e:
- log.warning(e)
- # self.stop_session('Invalid config')
- return 1
- return 0
- def get_stats(self):
- '''compute true positives, false positives etc'''
- if hasattr(self.speller, 'current_response'):
- log.warning(f'history: {self.decoder_decision_history}')
- try:
- for cl_id in range(self.params.classifier.n_triggers):
- if self.decoder_decision_history[-1] == cl_id:
- if self.speller.current_response == self.responses[cl_id]:
- self.tp_tot[cl_id] = self.tp_tot[cl_id] + 1
- else:
- self.fp_tot[cl_id] = self.fp_tot[cl_id] + 1
- if np.sum(np.array(self.decoder_decision_history) == cl_id) >0:
- self.tpr[cl_id] = self.tp_tot[cl_id] / np.sum(np.array(self.decoder_decision_history) == cl_id)
- self.fpr[cl_id] = self.fp_tot[cl_id] / (self.fp_tot[cl_id] + (len(self.decoder_decision_history) - np.sum(np.array(self.decoder_decision_history) == cl_id)))
- except Exception as e:
- log.error(e)
- log.debug(f'fp: {self.fp_tot}')
- return None
|