123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- '''
- description: testing speller
- author: Ioannis Vlachos
- date: 19.09.18
- Copyright (c) 2018 Ioannis Vlachos.
- All rights reserved.'''
- import importlib
- import logging
- import sys
- import time
- from random import randint
- import munch
- import pyaudio
- from colorlog import ColoredFormatter
- import aux
- from aux import log
- from paradigms import colorSpeller, exploration, question, trainingColorSpeller, feedback
- from multiprocessing import Array
- importlib.reload(trainingColorSpeller)
- importlib.reload(colorSpeller)
- importlib.reload(aux)
- paradigm_type = aux.args.speller
- # paradigm_type = 'question'
- params = aux.load_config()
- try:
- if paradigm_type == 'training_color':
- sp = trainingColorSpeller.TrainingColorSpeller(pyttsx_rate = params.speller.pyttsx_rate, params=params)
- elif paradigm_type == 'color':
- sp = colorSpeller.ColorSpeller(pyttsx_rate = params.speller.pyttsx_rate, params=params)
- elif paradigm_type == 'exploration':
- sp = exploration.Exploration(params=params)
- elif paradigm_type == 'question':
- sp = question.Question(params=params)
- elif paradigm_type == 'feedback':
- audio_fb_target = Array('d', 3)
- sp = feedback.Feedback(audio_fb_target, params=params)
- else:
- log.error('Speller does not exist')
- sys.exit(1)
- except Exception as e:
- log.error(e)
- log.error('Speller creation failed')
- raise(e)
- sys.exit(1)
- # for ii in range(10): # used to test refcounts to speller objects
- # log.info(f'index {ii}')
- # # print(sys.getrefcount(sp))
- # sp = exploration.Exploration()
- # # print(sys.getrefcount(sp))
- # time.sleep(.1)
- finish = False
- cnt = 0
- # answers = [0,0,0,1,1,1,1,1,1,0,1]
- # # answers = [0,1,-1,0,1,0,1,1]
- # answers = [1,1,0,0,0,1,1,1,0,1,1,0,1,0,1]
- # answers = [1,1,1,1,0,1,1,1,1,0,1] # program beenden
- # answers = [0,0]
- while not finish:
- # print('Current state: ' + sp.current_response)
- if hasattr(sp, 'current_selection'):
- current_selection = sp.current_selection
- else:
- current_selection = ''
- # Present the stimulus
- try:
- t1 = time.time()
- stim = sp.present_stimulus(audio=params.speller.audio)
- t2 = time.time()
- time.sleep(0.01)
- # print(f'Time elapsed: {t2 - t1}')
- # log.info(f'stim: {sp.current_stimulus}')
- if hasattr(sp, 'current_selection'):
- log.info(f'current state: {sp.current_selection}')
- if hasattr(sp, 'current_stimulus'):
- log.info(f'current state: {sp.current_stimulus}')
- if hasattr(sp, 'get_vocabulary'):
- log.info(f'current probable words: {sp.get_vocabulary()}')
- except Exception as e:
- log.error(e)
- # log.error('Speller failed')
- # sys.exit(1)
-
- if sp.mode == 'Validation' or sp.mode == 'Free' : # simulate decoder output
- print(cnt)
- # if 1:
- # answer = randint(0, 1)
- # if (cnt>=1 and cnt<2) or (cnt>=4 and cnt<=5):
- # answer = params.decision.unclassified
- # else:
-
- # answer = answers[cnt]
- user_answer = input(stim + '? ')
- if user_answer is 'y':
- answer = aux.decision.yes
- elif user_answer is 'q':
- break
- else:
- answer = aux.decision.no
- # answer = 0
- # if answer==0:
- # decision = params.decision.yes
- # else:
- # decision = params.decision.no
- else:
- answer = aux.decision.unclassified
- # log.info(f'decision: {params.decision(answer).name}')
- # log.info(sp.current_stimulus)
-
- finish = sp.process_result(aux.decision(answer).value)
- print(finish)
- # print(f'cnt: {cnt}, current selection: {current_selection}, decision: {params.decision(answer).name} , Current state: {sp.get_current_state()[0]}, {sp.get_current_state()[1]}, current string: {sp.current_string}')
- cnt += 1
- if hasattr(sp, 'current_string'):
- sp._say(sp.current_string.lower(), sp.trigger[0], sp.trigger[1])
- # Close the paradigm
- sp.close()
- matrix.terminate()
|