123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- '''
- description: class for neurofeedback
- author: Ioannis Vlachos
- date: 08.01.2019
- Copyright (c) 2019 Ioannis Vlachos.
- All rights reserved.'''
- import os
- import random
- import aux
- from aux import log
- # from paradigm import Paradigm
- import pyaudio
- from scipy.io import wavfile
- from scipy import signal
- import numpy as np
- import munch
- def norm2freq(norm_rate, alpha=360, beta=80):
- return norm_rate * alpha + beta
- def note(freq, t_len, amp=1, rate=44100, mode='sin'):
- n_sample = int(t_len * rate)
- factor = (float(freq) * np.pi * 2 / rate)
- data = []
- if mode == 'saw':
- data = signal.sawtooth(np.arange(n_sample) * factor, .25) * amp
- else:
- data = np.sin(np.arange(n_sample) * factor) * amp
- fade = int(n_sample/5)
- fade_in = np.arange(0., 1., 1/fade)
- fade_out = np.arange(1., 0., -1/fade)
- data[:fade] = np.multiply(data[:fade], fade_in)
- data[-fade:] = np.multiply(data[-fade:], fade_out)
- return data.astype(np.float32)
- def load_wav(wav_file, channels=2, rate=44100):
- r, wf = wavfile.read(wav_file)
- if wf.dtype == 'int16':
- wf = np.float32(wf) / pow(2,15)
- if wf.shape[1] > channels:
- wf = wf[:, :channels]
- elif wf.shape[1] < channels:
- if wf.shape[1] == 1:
- wf = np.repeat(wf, channels, axis=1)
- if r != rate:
- n_s = int(wf.shape[0] * rate / r)
- wf = signal.resample(wf, n_s)
- return wf
- RATE = 44100
- # class Feedback(Paradigm):
- class Feedback():
- '''
- Paradigm for neurofeedback based on action potentials and/or LFP
- '''
- def __init__(self, audio_fb_target, params=None, **kwargs):
- '''
- Initialize the paradigm.'''
- self.audio_fb_target = audio_fb_target
- self.params = params
- config = self.params.paradigms.feedback
- log.info(config)
-
- self.audio_path = config.audio_path
- self.num_stim = config.number_of_stim
- self.mode = config.mode[config.selected_mode]
- self.states = config.states
- self.list_of_stimuli = self._create_list()
- # try:
- self.current_stimulus = self.list_of_stimuli.pop(0)
-
- self.history = []
- pa = pyaudio.PyAudio()
- self.stream = pa.open(output=True, channels=2, rate=RATE,format=pyaudio.paFloat32)
-
- self.sounds = {'success': load_wav(config.sounds.success).tostring(),
- 'fail': load_wav(config.sounds.fail).tostring()
- }
- def _create_list(self):
- '''
- Create a list of randomly ordered up/down stimuli
- Parameters:
- '''
-
- state_names = [k for k in self.states.keys() if k != 'baseline'] * self.num_stim
- random.seed()
- random.shuffle(state_names)
-
- if 'baseline' in self.states.keys():
- state_names = [k for state in state_names for k in ('baseline', state)]
- return state_names
- def present_stimulus(self, audio=True):
- '''
- Play auditorly the current stimulus using the play function defined in
- the abstract class
- Return:
- current_stimulus: string (the stimulus that is presented)
- '''
- log.info("Presenting stimulus")
- if audio:
-
- alpha = self.params.feedback.alpha
- beta = self.params.feedback.beta
- length = self.params.feedback.target_tone_length
-
- for i in range(len(self.audio_fb_target)):
- self.audio_fb_target[i] = self.states[self.current_stimulus][i]
-
-
- target_freq_float = norm2freq(self.audio_fb_target[0], alpha, beta)
-
- target_tone = note(target_freq_float, length, amp=.25, rate=RATE, mode='saw')
- tone = np.repeat(target_tone.reshape((-1, 1)), 2, axis=1).flatten()
-
- if self.stream.is_stopped():
- self.stream.start_stream()
- self.stream.write(tone.tostring())
- self.stream.stop_stream()
-
- log.debug('stimulus end')
- return self.current_stimulus
- def process_result(self, decision, **kwargs):
- '''ignore argument decision, just for compatibility reasons with other spellers '''
-
- if self.params.paradigms.feedback.play_end_feedback.success and ((decision == aux.decision.yes.value and self.current_stimulus == 'up') or (decision == aux.decision.no.value and self.current_stimulus == 'down')):
- if self.stream.is_stopped():
- self.stream.start_stream()
- self.stream.write(self.sounds['success'])
- self.stream.stop_stream()
- elif self.params.paradigms.feedback.play_end_feedback.fail and ((decision != aux.decision.yes.value and self.current_stimulus == 'up') or (decision != aux.decision.no.value and self.current_stimulus == 'down')):
- if self.stream.is_stopped():
- self.stream.start_stream()
- self.stream.write(self.sounds['fail'])
- self.stream.stop_stream()
-
- if self.list_of_stimuli:
- self.current_stimulus = self.list_of_stimuli.pop(0)
- return False
- else:
- return True
- def get_current_state(self):
-
- return 'feedback', self.current_stimulus
-
- def get_mode(self):
- '''
- Return the mode of the used paradigm
-
- Return:
- mode: string (mode of the paradigm)
-
- '''
- return self.mode
-
- def save_log(self):
- '''
- Save the log
-
- '''
- pass
- def close(self):
- return
-
- def _read_config(self,file):
- '''
- Read the configuration yaml file
-
- Parameters:
- file: string (name of the configuration yaml file)
-
- Return:
- config: munch structure (configuration file)
- '''
- try:
- with open(file) as stream:
- config = munch.fromYAML(stream)
- return config
- except Exception as e:
- raise e
|