In [1]:
from scipy.io import wavfile
import soundfile as sf
import os

cont_noise_data, cont_noise_s_rate = sf.read(os.path.join('..', 'assets', 'stream2.wav'), dtype='float32')

# cont_noise_s_rate, cont_noise_data = wavfile.read(os.path.join('..', 'assets', 'stream2.wav'))
# #cont_noise_data = cont_noise_data.astype(np.float32)
cont_noise_data.shape, cont_noise_s_rate

((2880512, 2), 48000)

In [2]:
cont_noise_data, cont_noise_s_rate = sf.read(os.path.join('..', 'assets', 'chirp_rate192KHz_100ms_2000Hz_30000Hz.wav'), dtype='float32')

# cont_noise_s_rate, cont_noise_data = wavfile.read(os.path.join('..', 'assets', 'stream2.wav'))
# #cont_noise_data = cont_noise_data.astype(np.float32)
cont_noise_data.shape, cont_noise_s_rate

((19200,), 192000)

In [3]:
import matplotlib.pyplot as plt

plt.plot(cont_noise_data[:, 0][:200000])

IndexError: too many indices for array: array is 1-dimensional, but 2 were indexed

In [5]:
%%writefile sound.py
import numpy as np
import time
from scipy.signal import lfilter
from functools import reduce

import os
import threading
import random

class SoundController:
 # https://python-sounddevice.readthedocs.io/en/0.3.15/api/streams.html#sounddevice.OutputStream
 
 default_cfg = {
 "device": [1, 26],
 "n_channels": 10,
 "sounds": {
 "noise": {"amp": 0.2, "channels": [6, 8]},
 "background": {"freq": 660, "amp": 0.1, "duration": 0.05, "harmonics": True, "channels": [3, 8]},
 "target": {"freq": 1320, "amp": 0.1, "duration": 0.05, "harmonics": True, "channels": [3, 8]}, 
 "distractor1": {"freq": 860, "amp": 0.15, "duration": 0.05, "harmonics": True, "channels": [6, 8], "enabled": False},
 "distractor2": {"freq": 1060, "amp": 0.25, "duration": 0.05, "harmonics": True, "channels": [6, 8], "enabled": False},
 "distractor3": {"freq": 1320, "amp": 0.2, "duration": 0.05, "harmonics": True, "channels": [6, 8], "enabled": False}
 },
 "pulse_duration": 0.05,
 "sample_rate": 44100,
 "latency": 0.25,
 "volume": 0.7,
 "roving": 5.0,
 "file_path": "sounds.csv"
 }
 
 commutator = {
 -1: 'noise',
 0: 'silence',
 1: 'background',
 2: 'target',
 3: 'distractor1',
 4: 'distractor2',
 5: 'distractor3',
 6: 'distractor4',
 7: 'distractor5'
 }
 
 @classmethod
 def get_pure_tone(cls, freq, duration, sample_rate=44100):
 x = np.linspace(0, duration * freq * 2*np.pi, int(duration*sample_rate), dtype=np.float32)
 return np.sin(x)

 @classmethod
 def get_harm_stack(cls, base_freq, duration, threshold=1500, sample_rate=44100):
 harmonics = [x * base_freq for x in np.arange(20) + 2 if x * base_freq < threshold] # first 20 enouch
 freqs = [base_freq] + harmonics
 x = np.linspace(0, duration, int(sample_rate * duration))
 y = reduce(lambda x, y: x + y, [(1./(i+1)) * np.sin(base_freq * 2 * np.pi * x) for i, base_freq in enumerate(freqs)])
 return y / y.max() # norm to -1 to 1
 
 @classmethod
 def get_cos_window(cls, tone, win_duration, sample_rate=44100):
 x = np.linspace(0, np.pi/2, int(win_duration * sample_rate), dtype=np.float32)
 onset = np.sin(x)
 middle = np.ones(len(tone) - 2 * len(x))
 offset = np.cos(x)
 return np.concatenate([onset, middle, offset])

 @classmethod
 def get_tone_stack(cls, cfg):
 # silence
 #silence = np.zeros(9600, dtype='float32')
 silence = np.zeros(int(cfg['sample_rate']/1000), dtype='float32')
 sounds = {'silence': np.column_stack([silence for x in range(cfg['n_channels'])])}

 # noise
 filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])
 filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])

 noise = np.random.randn(int(cfg['latency'] * cfg['sample_rate'])) # it was 250ms of noise, now use cfg['latency'] instead of hardcoded 0.25
 noise = lfilter(filter_a, filter_b, noise)
 noise = noise / np.abs(noise).max() * cfg['sounds']['noise']['amp']
 noise = noise.astype(np.float32)
 empty = np.zeros((len(noise), cfg['n_channels']), dtype='float32')
 for ch in cfg['sounds']['noise']['channels']:
 empty[:, ch-1] = noise
 sounds['noise'] = empty
 
 # all other sounds
 for key, snd in cfg['sounds'].items():
 if key == 'noise' or ('enabled' in snd and not snd['enabled']):
 continue # skip noise or unused sounds
 
 if 'harmonics' in snd and snd['harmonics']:
 tone = cls.get_harm_stack(snd['freq'], snd['duration'], sample_rate=cfg['sample_rate']) * cfg['volume']
 else:
 tone = cls.get_pure_tone(snd['freq'], snd['duration'], cfg['sample_rate']) * cfg['volume']
 tone = tone * cls.get_cos_window(tone, 0.01, cfg['sample_rate']) # onset / offset
 tone = tone * snd['amp'] # amplitude
 
 sound = np.zeros([len(tone), cfg['n_channels']], dtype='float32')
 for j in snd['channels']:
 sound[:, j-1] = tone
 
 sounds[key] = sound

 return sounds
 
 @classmethod
 def scale(cls, orig_s_rate, target_s_rate, orig_data):
 factor = target_s_rate / orig_s_rate
 x_orig = np.linspace(0, int(factor * len(orig_data)), len(orig_data))
 x_target = np.linspace(0, int(factor * len(orig_data)), int(factor * len(orig_data)))
 return np.interp(x_target, x_orig, orig_data)
 
 @classmethod
 def run(cls, selector, status, cfg, commutator):
 """
 selector mp.Value object to set the sound to be played
 status mp.Value object to stop the loop
 """
 import sounddevice as sd # must be inside the function
 import soundfile as sf
 import numpy as np
 import time
 
 # this is a continuous noise shit
 if cfg['cont_noise']['enabled']:
 #cont_noise_s_rate, cont_noise_data = wavfile.read(cfg['cont_noise']['filepath'])
 cont_noise_data, cont_noise_s_rate = sf.read(cfg['cont_noise']['filepath'], dtype='float32') # float32!!
 target_s_rate = cfg['sample_rate']
 orig_s_rate = cont_noise_s_rate
 if len(cont_noise_data.shape) > 1:
 orig_data = cont_noise_data[:, 0]
 else:
 orig_data = cont_noise_data
 cont_noise_target = cls.scale(orig_s_rate, target_s_rate, orig_data) * cfg['cont_noise']['amp']
 #cont_noise_target = cont_noise_data * cfg['cont_noise']['amp']
 c_noise_pointer = 0
 
 print(cfg['cont_noise']['amp'])
 
 # regular sounds
 sounds = cls.get_tone_stack(cfg)

 sd.default.device = cfg['device']
 sd.default.samplerate = cfg['sample_rate']
 stream = sd.OutputStream(samplerate=cfg['sample_rate'], channels=cfg['n_channels'], dtype='float32', blocksize=256)
 stream.start()

 next_beat = time.time() + cfg['latency']
 with open(cfg['file_path'], 'w') as f:
 f.write("time,id\n")

 while status.value > 0:
 if status.value == 2 or (status.value == 1 and selector.value == -1): # running state or masking noise
 t0 = time.time()
 if t0 < next_beat:
 time.sleep(0.0005) # not to spin the wheels too much
 if stream.write_available > sounds['silence'].shape[0]:
 block_to_write = sounds['silence'].copy() # 2D matrix time x channels
 if cfg['cont_noise']['enabled']:
 if c_noise_pointer + block_to_write.shape[0] > len(cont_noise_target):
 c_noise_pointer = 0
 cont_noise_block = cont_noise_target[c_noise_pointer:c_noise_pointer + block_to_write.shape[0]]
 for ch in cfg['cont_noise']['channels']:
 block_to_write[:, ch-1] += cont_noise_block
 c_noise_pointer += block_to_write.shape[0]

 stream.write(block_to_write) # silence
 continue

 roving = 10**((np.random.rand() * cfg['roving'] - cfg['roving']/2.0)/20.)
 roving = roving if int(selector.value) > -1 else 1 # no roving for noise
 block_to_write = sounds[commutator[int(selector.value)]] * roving # this is a 2D time x channels

 if cfg['cont_noise']['enabled']:
 if c_noise_pointer + block_to_write.shape[0] > len(cont_noise_target):
 c_noise_pointer = 0
 cont_noise_block = cont_noise_target[c_noise_pointer:c_noise_pointer + block_to_write.shape[0]]
 for ch in cfg['cont_noise']['channels']:
 block_to_write[:, ch-1] += cont_noise_block
 c_noise_pointer += block_to_write.shape[0]
 
 stream.write(block_to_write)
 
 if status.value == 2:
 with open(cfg['file_path'], 'a') as f:
 f.write(",".join([str(x) for x in (t0, selector.value)]) + "\n")

 next_beat += cfg['latency']
 
 #if stream.write_available > 2:
 # stream.write(sounds['silence']) # silence
 
 else: # idle state
 next_beat = time.time() + cfg['latency']
 time.sleep(0.005)
 
 stream.stop()
 stream.close()
 print('Sound stopped')

 
class ContinuousSoundStream:
 
 default_cfg = {
 'wav_file': os.path.join('..', 'assets', 'stream1.wav'),
 'chunk_duration': 20,
 'chunk_offset': 2
 }
 
 def __init__(self, cfg):
 from scipy.io import wavfile
 import sounddevice as sd

 self.cfg = cfg
 self.stopped = False
 self.samplerate, self.data = wavfile.read(cfg['wav_file'])
 self.stream = sd.OutputStream(samplerate=self.samplerate, channels=2, dtype=self.data.dtype)

 def start(self):
 self._th = threading.Thread(target=self.update, args=())
 self._th.start()

 def stop(self):
 self.stopped = True
 self._th.join()
 print('Continuous sound stream released')
 
 def update(self):
 self.stream.start()
 print('Continuous sound stream started at %s Hz' % (self.samplerate))
 
 offset = int(self.cfg['chunk_offset'] * self.samplerate)
 chunk = int(self.cfg['chunk_duration'] * self.samplerate)
 
 while not self.stopped:
 start_idx = offset + np.random.randint(self.data.shape[0] - 2 * offset - chunk)
 end_idx = start_idx + chunk
 self.stream.write(self.data[start_idx:end_idx])
 
 self.stream.stop()
 self.stream.close()
 
 
class SoundControllerPR:
 
 default_cfg = {
 "device": [1, 26],
 "n_channels": 10,
 "sounds": {
 "noise": {"amp": 0.2, "duration": 2.0, "channels": [6, 8]},
 "target": {"freq": 660, "amp": 0.1, "duration": 2.0}, 
 },
 "sample_rate": 44100,
 "volume": 0.7,
 "file_path": "sounds.csv"
 }
 
 def __init__(self, status, cfg):
 import sounddevice as sd # must be inside the function
 import numpy as np
 import time

 sd.default.device = cfg['device']
 sd.default.samplerate = cfg['sample_rate']
 self.stream = sd.OutputStream(samplerate=cfg['sample_rate'], channels=cfg['n_channels'], dtype='float32', blocksize=256)
 self.stream.start()

 self.timers = []
 self.status = status
 self.cfg = cfg
 
 # noise (not assigned to channels)
 filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])
 filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])

 noise = np.random.randn(int(cfg['sounds']['noise']['duration'] * cfg['sample_rate']))
 noise = lfilter(filter_a, filter_b, noise)
 noise = noise / np.abs(noise).max() * cfg['sounds']['noise']['amp']
 noise = noise.astype(np.float32)

 # target (not assigned to channels)
 sample_rate = cfg['sample_rate']
 target_cfg = cfg['sounds']['target']

 tone = SoundController.get_pure_tone(target_cfg['freq'], target_cfg['duration'], sample_rate=cfg['sample_rate'])
 tone = tone * SoundController.get_cos_window(tone, target_cfg['window'], sample_rate=cfg['sample_rate'])

 if target_cfg['number'] > 1:
 silence = np.zeros( int(target_cfg['iti'] * cfg['sample_rate']) )
 tone_with_iti = np.concatenate([tone, silence])
 target = np.concatenate([tone_with_iti for i in range(target_cfg['number'] - 1)])
 target = np.concatenate([target, tone])
 else:
 target = tone
 
 target = target * target_cfg['amp'] # amplitude
 
 #snd = cfg['sounds']['target']
 #target = SoundController.get_pure_tone(snd['freq'], snd['duration'], cfg['sample_rate']) * cfg['volume']
 #target = target * SoundController.get_cos_window(target, 0.01, cfg['sample_rate']) # onset / offset
 #target = target * snd['amp'] # amplitude
 
 self.sounds = {'noise': noise, 'target': target}
 
 def target(self, hd_angle):
 to_play = np.zeros((len(self.sounds['target']), self.cfg['n_channels']), dtype='float32')
 channel = random.choice(self.cfg['sounds']['target']['channels']) # random speaker!
 
 to_play[:, channel-1] = self.sounds['target']
 
 t0 = time.time()
 with open(self.cfg['file_path'], 'a') as f:
 f.write(",".join([str(x) for x in (t0, 2, channel)]) + "\n")
 
 self.stream.write(to_play)
 
 def noise(self):
 to_play = np.zeros((len(self.sounds['noise']), self.cfg['n_channels']), dtype='float32')
 for ch in self.cfg['sounds']['noise']['channels']:
 to_play[:, ch-1] = self.sounds['noise']
 
 ch1 = self.cfg['sounds']['noise']['channels'][0]
 t0 = time.time()
 with open(self.cfg['file_path'], 'a') as f:
 f.write(",".join([str(x) for x in (t0, -1, ch1)]) + "\n")
 
 self.stream.write(to_play)
 
 def play_non_blocking(self, sound_id, hd_angle=0):
 if sound_id == 'target':
 tf = threading.Timer(0, self.target, args=[hd_angle])
 elif sound_id == 'noise':
 tf = threading.Timer(0, self.noise, args=[])
 tf.start()
 self.timers.append(tf)
 
 def stop(self):
 for t in self.timers:
 t.cancel()
 self.stream.stop()
 self.stream.close()

Overwriting sound.py


In [None]:
from sound import SoundController
import numpy as np

sample_rate = 96000
target_cfg = {"freq": 660, "amp": 0.1, "duration": 0.1, "window": 0.005, "iti": 0.1, "number": 7}

tone = SoundController.get_pure_tone(target_cfg['freq'], target_cfg['duration'], sample_rate=sample_rate)
tone = tone * SoundController.get_cos_window(tone, target_cfg['window'], sample_rate=sample_rate)

if target_cfg['number'] > 1:
 silence = np.zeros( int(target_cfg['iti'] * sample_rate) )
 tone_with_iti = np.concatenate([tone, silence])
 pulses = np.concatenate([tone_with_iti for i in range(target_cfg['number'] - 1)])
 pulses = np.concatenate([pulses, tone])
else:
 pulses = tone

import matplotlib.pyplot as plt
plt.plot(pulses)

### Testing sound controller for Precedence project

In [None]:
import numpy as np
import time, os
from sound import SoundControllerPR

In [89]:
sound_cfg = {
 "device": [1, 24],
 "n_channels": 14,
 "sounds": {
 "noise": {"amp": 0.01, "duration": 10.0, "channels": [6]}, # 69 DBA (amp 0.01, volume 0.1)
 "target": {"freq": 660, "amp": 0.3, "duration": 0.015, "window": 0.005, "iti": 0.1, "number": 10, "channels": [5]}
 },
 "sample_rate": 44100,
 "volume": 0.1,
 "file_path": "sounds.csv"
}

In [87]:
sc = SoundControllerPR(1, sound_cfg)

In [88]:
sc.noise() # 5 is 3
#sc.play_non_blocking('noise', 0)

In [85]:
sc.stop()

### Building sound stack

In [None]:
import numpy as np
import time, os
from sound import SoundController

cfg = SoundController.default_cfg
sounds = SoundController.get_tone_stack(cfg)

In [None]:
import matplotlib.pyplot as plt

_ = plt.plot(sounds['noise'])

### Roving and onset window

In [None]:
from sound import SoundController

duration = 0.05
freq = 440

tone = SoundController.get_pure_tone(freq, duration)
tone = tone * SoundController.get_cos_window(tone, 0.01)

In [None]:
import matplotlib.pyplot as plt
plt.plot(tone)

### Building noise

In [None]:
import numpy as np
from scipy.signal import lfilter

sample_rate = 44100

filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])
filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])

noise = np.random.randn(3*sample_rate)
noise = lfilter(filter_a, filter_b, noise)
noise = noise / np.abs(noise).max() * 0.5
noise = noise.astype(np.float32)

In [None]:
import matplotlib.pyplot as plt

plt.plot(noise)

### Testing sound pulses without MP

In [None]:
import numpy as np
import time, os
import multiprocess as mp
from sound import SoundController

# sound selector: 0 - silence, 1 - tone 1, 2 - tone 2
selector = mp.Value('i', -1)

# loggin status: 1 - idle, 2 - running, 0 - stopped
status = mp.Value('i', 2)

cfg = SoundController.default_cfg
#cfg['device'] = [1, 26] # 'M-Audio Delta ASIO'
SoundController.run(selector, status, cfg)

# nothing happens for a second
time.sleep(1)

status.value = 2
for i in range(2):
 time.sleep(1)
 selector.value = -1 if selector.value == 2 else 2

# stop
status.value = 0
time.sleep(0.2)

### Testing sound pulses with MP

In [None]:
import numpy as np
import time, os
import multiprocess as mp
from sound import SoundController

# sound selector: 0 - silence, 1 - tone 1, 2 - tone 2
selector = mp.Value('i', -1)

# loggin status: 1 - idle, 2 - running, 0 - stopped
status = mp.Value('i', 1)

cfg = SoundController.default_cfg
cfg['device'] = [1, 26] # 'M-Audio Delta ASIO'
sc = mp.Process(target=SoundController.run, args=(selector, status, cfg))
sc.start()

# nothing happens for a second
time.sleep(1)

status.value = 2
for i in range(3):
 time.sleep(1)
 selector.value = -1 if selector.value == 1 else 1

# stop
status.value = 0
time.sleep(0.2)
sc.join()

In [None]:
import matplotlib.pyplot as plt

ds = np.loadtxt('test_sound_log.csv', delimiter=',', skiprows=1)
plt.plot(np.diff(ds[:, 0]))

### Sounddevice playground

In [None]:
import sounddevice as sd
[(i, x) for i, x in enumerate(sd.query_devices()) if x['name'].find('ASIO') > 0]

In [None]:
import sounddevice as sd
import numpy as np
#import keyboard # using module keyboard

sd.default.device = [1, 24]
sd.default.samplerate = 44100
stream = sd.OutputStream(samplerate=44100, channels=14, dtype='float32')

In [None]:
# 3rd channel - left arena speaker
# 1st channel - right arena speaker

In [None]:
duration = 2.5

x1 = np.linspace(0, duration * 220 * 2*np.pi, int(duration*44100), dtype=np.float32)
x2 = np.linspace(0, duration * 440 * 2*np.pi, int(duration*44100), dtype=np.float32)
y1 = np.sin(x1)
y2 = np.sin(x2)
sil = np.zeros(len(x1), dtype=np.float32)

In [None]:
stream.start()
stream.write(np.column_stack([sil, sil, sil, sil, sil, sil, sil, sil, sil, y1, sil, sil, sil, sil]) * 0.8)
stream.stop()

# [None, None, 1, 2, 3, 4, 5, 6, 7, 8, None, None, None, None]

In [None]:
stream.start()

try:
 while True: # making a loop
 if keyboard.is_pressed('q'): # if key 'q' is pressed 
 break # finishing the loop

 stream.write(np.column_stack([y1, sil, sil, sil, sil, sil, sil, sil, sil, sil]) * 0.8)
 #stream.write(np.column_stack([y2, y2, y2, y2, y2, y2, y2, y2, y2, y2]) * 0.8)
 
finally:
 stream.stop()

## Permanent sound in a separate audio stream

In [None]:
import sounddevice as sd
import numpy as np
import os
import threading
from scipy.io import wavfile

In [None]:
sd.query_devices()

In [None]:
wav_fname = os.path.join('..', 'assets', 'stream1.wav')
samplerate, data = wavfile.read(wav_fname)

print(samplerate, data.shape, data.dtype)

In [None]:
stream = sd.OutputStream(samplerate=samplerate, channels=2, dtype=data.dtype)

stream.start()

for i in range(5):
 stream.write(data[50000:100000])
 print('playing')
 
stream.stop()

In [None]:
cfg = ContinuousSoundStream.default_cfg

cst = ContinuousSoundStream(cfg)
cst.start()

In [None]:
cst.stop()