{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "((2880512, 2), 48000)" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from scipy.io import wavfile\n", "import soundfile as sf\n", "import os\n", "\n", "cont_noise_data, cont_noise_s_rate = sf.read(os.path.join('..', 'assets', 'stream2.wav'), dtype='float32')\n", "\n", "# cont_noise_s_rate, cont_noise_data = wavfile.read(os.path.join('..', 'assets', 'stream2.wav'))\n", "# #cont_noise_data = cont_noise_data.astype(np.float32)\n", "cont_noise_data.shape, cont_noise_s_rate" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "((19200,), 192000)" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cont_noise_data, cont_noise_s_rate = sf.read(os.path.join('..', 'assets', 'chirp_rate192KHz_100ms_2000Hz_30000Hz.wav'), dtype='float32')\n", "\n", "# cont_noise_s_rate, cont_noise_data = wavfile.read(os.path.join('..', 'assets', 'stream2.wav'))\n", "# #cont_noise_data = cont_noise_data.astype(np.float32)\n", "cont_noise_data.shape, cont_noise_s_rate" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "ename": "IndexError", "evalue": "too many indices for array: array is 1-dimensional, but 2 were indexed", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mIndexError\u001b[0m Traceback (most recent call last)", "\u001b[1;32m\u001b[0m in \u001b[0;36m\u001b[1;34m\u001b[0m\n\u001b[0;32m 1\u001b[0m \u001b[1;32mimport\u001b[0m \u001b[0mmatplotlib\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpyplot\u001b[0m \u001b[1;32mas\u001b[0m \u001b[0mplt\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 2\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 3\u001b[1;33m \u001b[0mplt\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mplot\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcont_noise_data\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;36m200000\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;31mIndexError\u001b[0m: too many indices for array: array is 1-dimensional, but 2 were indexed" ] } ], "source": [ "import matplotlib.pyplot as plt\n", "\n", "plt.plot(cont_noise_data[:, 0][:200000])" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting sound.py\n" ] } ], "source": [ "%%writefile sound.py\n", "import numpy as np\n", "import time\n", "from scipy.signal import lfilter\n", "from functools import reduce\n", "\n", "import os\n", "import threading\n", "import random\n", "\n", "class SoundController:\n", " # https://python-sounddevice.readthedocs.io/en/0.3.15/api/streams.html#sounddevice.OutputStream\n", " \n", " default_cfg = {\n", " \"device\": [1, 26],\n", " \"n_channels\": 10,\n", " \"sounds\": {\n", " \"noise\": {\"amp\": 0.2, \"channels\": [6, 8]},\n", " \"background\": {\"freq\": 660, \"amp\": 0.1, \"duration\": 0.05, \"harmonics\": True, \"channels\": [3, 8]},\n", " \"target\": {\"freq\": 1320, \"amp\": 0.1, \"duration\": 0.05, \"harmonics\": True, \"channels\": [3, 8]}, \n", " \"distractor1\": {\"freq\": 860, \"amp\": 0.15, \"duration\": 0.05, \"harmonics\": True, \"channels\": [6, 8], \"enabled\": False},\n", " \"distractor2\": {\"freq\": 1060, \"amp\": 0.25, \"duration\": 0.05, \"harmonics\": True, \"channels\": [6, 8], \"enabled\": False},\n", " \"distractor3\": {\"freq\": 1320, \"amp\": 0.2, \"duration\": 0.05, \"harmonics\": True, \"channels\": [6, 8], \"enabled\": False}\n", " },\n", " \"pulse_duration\": 0.05,\n", " \"sample_rate\": 44100,\n", " \"latency\": 0.25,\n", " \"volume\": 0.7,\n", " \"roving\": 5.0,\n", " \"file_path\": \"sounds.csv\"\n", " }\n", " \n", " commutator = {\n", " -1: 'noise',\n", " 0: 'silence',\n", " 1: 'background',\n", " 2: 'target',\n", " 3: 'distractor1',\n", " 4: 'distractor2',\n", " 5: 'distractor3',\n", " 6: 'distractor4',\n", " 7: 'distractor5'\n", " }\n", " \n", " @classmethod\n", " def get_pure_tone(cls, freq, duration, sample_rate=44100):\n", " x = np.linspace(0, duration * freq * 2*np.pi, int(duration*sample_rate), dtype=np.float32)\n", " return np.sin(x)\n", "\n", " @classmethod\n", " def get_harm_stack(cls, base_freq, duration, threshold=1500, sample_rate=44100):\n", " harmonics = [x * base_freq for x in np.arange(20) + 2 if x * base_freq < threshold] # first 20 enouch\n", " freqs = [base_freq] + harmonics\n", " x = np.linspace(0, duration, int(sample_rate * duration))\n", " y = reduce(lambda x, y: x + y, [(1./(i+1)) * np.sin(base_freq * 2 * np.pi * x) for i, base_freq in enumerate(freqs)])\n", " return y / y.max() # norm to -1 to 1\n", " \n", " @classmethod\n", " def get_cos_window(cls, tone, win_duration, sample_rate=44100):\n", " x = np.linspace(0, np.pi/2, int(win_duration * sample_rate), dtype=np.float32)\n", " onset = np.sin(x)\n", " middle = np.ones(len(tone) - 2 * len(x))\n", " offset = np.cos(x)\n", " return np.concatenate([onset, middle, offset])\n", "\n", " @classmethod\n", " def get_tone_stack(cls, cfg):\n", " # silence\n", " #silence = np.zeros(9600, dtype='float32')\n", " silence = np.zeros(int(cfg['sample_rate']/1000), dtype='float32')\n", " sounds = {'silence': np.column_stack([silence for x in range(cfg['n_channels'])])}\n", "\n", " # noise\n", " filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])\n", " filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])\n", "\n", " noise = np.random.randn(int(cfg['latency'] * cfg['sample_rate'])) # it was 250ms of noise, now use cfg['latency'] instead of hardcoded 0.25\n", " noise = lfilter(filter_a, filter_b, noise)\n", " noise = noise / np.abs(noise).max() * cfg['sounds']['noise']['amp']\n", " noise = noise.astype(np.float32)\n", " empty = np.zeros((len(noise), cfg['n_channels']), dtype='float32')\n", " for ch in cfg['sounds']['noise']['channels']:\n", " empty[:, ch-1] = noise\n", " sounds['noise'] = empty\n", " \n", " # all other sounds\n", " for key, snd in cfg['sounds'].items():\n", " if key == 'noise' or ('enabled' in snd and not snd['enabled']):\n", " continue # skip noise or unused sounds\n", " \n", " if 'harmonics' in snd and snd['harmonics']:\n", " tone = cls.get_harm_stack(snd['freq'], snd['duration'], sample_rate=cfg['sample_rate']) * cfg['volume']\n", " else:\n", " tone = cls.get_pure_tone(snd['freq'], snd['duration'], cfg['sample_rate']) * cfg['volume']\n", " tone = tone * cls.get_cos_window(tone, 0.01, cfg['sample_rate']) # onset / offset\n", " tone = tone * snd['amp'] # amplitude\n", " \n", " sound = np.zeros([len(tone), cfg['n_channels']], dtype='float32')\n", " for j in snd['channels']:\n", " sound[:, j-1] = tone\n", " \n", " sounds[key] = sound\n", "\n", " return sounds\n", " \n", " @classmethod\n", " def scale(cls, orig_s_rate, target_s_rate, orig_data):\n", " factor = target_s_rate / orig_s_rate\n", " x_orig = np.linspace(0, int(factor * len(orig_data)), len(orig_data))\n", " x_target = np.linspace(0, int(factor * len(orig_data)), int(factor * len(orig_data)))\n", " return np.interp(x_target, x_orig, orig_data)\n", " \n", " @classmethod\n", " def run(cls, selector, status, cfg, commutator):\n", " \"\"\"\n", " selector mp.Value object to set the sound to be played\n", " status mp.Value object to stop the loop\n", " \"\"\"\n", " import sounddevice as sd # must be inside the function\n", " import soundfile as sf\n", " import numpy as np\n", " import time\n", " \n", " # this is a continuous noise shit\n", " if cfg['cont_noise']['enabled']:\n", " #cont_noise_s_rate, cont_noise_data = wavfile.read(cfg['cont_noise']['filepath'])\n", " cont_noise_data, cont_noise_s_rate = sf.read(cfg['cont_noise']['filepath'], dtype='float32') # float32!!\n", " target_s_rate = cfg['sample_rate']\n", " orig_s_rate = cont_noise_s_rate\n", " if len(cont_noise_data.shape) > 1:\n", " orig_data = cont_noise_data[:, 0]\n", " else:\n", " orig_data = cont_noise_data\n", " cont_noise_target = cls.scale(orig_s_rate, target_s_rate, orig_data) * cfg['cont_noise']['amp']\n", " #cont_noise_target = cont_noise_data * cfg['cont_noise']['amp']\n", " c_noise_pointer = 0\n", " \n", " print(cfg['cont_noise']['amp'])\n", " \n", " # regular sounds\n", " sounds = cls.get_tone_stack(cfg)\n", "\n", " sd.default.device = cfg['device']\n", " sd.default.samplerate = cfg['sample_rate']\n", " stream = sd.OutputStream(samplerate=cfg['sample_rate'], channels=cfg['n_channels'], dtype='float32', blocksize=256)\n", " stream.start()\n", "\n", " next_beat = time.time() + cfg['latency']\n", " with open(cfg['file_path'], 'w') as f:\n", " f.write(\"time,id\\n\")\n", "\n", " while status.value > 0:\n", " if status.value == 2 or (status.value == 1 and selector.value == -1): # running state or masking noise\n", " t0 = time.time()\n", " if t0 < next_beat:\n", " time.sleep(0.0005) # not to spin the wheels too much\n", " if stream.write_available > sounds['silence'].shape[0]:\n", " block_to_write = sounds['silence'].copy() # 2D matrix time x channels\n", " if cfg['cont_noise']['enabled']:\n", " if c_noise_pointer + block_to_write.shape[0] > len(cont_noise_target):\n", " c_noise_pointer = 0\n", " cont_noise_block = cont_noise_target[c_noise_pointer:c_noise_pointer + block_to_write.shape[0]]\n", " for ch in cfg['cont_noise']['channels']:\n", " block_to_write[:, ch-1] += cont_noise_block\n", " c_noise_pointer += block_to_write.shape[0]\n", "\n", " stream.write(block_to_write) # silence\n", " continue\n", "\n", " roving = 10**((np.random.rand() * cfg['roving'] - cfg['roving']/2.0)/20.)\n", " roving = roving if int(selector.value) > -1 else 1 # no roving for noise\n", " block_to_write = sounds[commutator[int(selector.value)]] * roving # this is a 2D time x channels\n", "\n", " if cfg['cont_noise']['enabled']:\n", " if c_noise_pointer + block_to_write.shape[0] > len(cont_noise_target):\n", " c_noise_pointer = 0\n", " cont_noise_block = cont_noise_target[c_noise_pointer:c_noise_pointer + block_to_write.shape[0]]\n", " for ch in cfg['cont_noise']['channels']:\n", " block_to_write[:, ch-1] += cont_noise_block\n", " c_noise_pointer += block_to_write.shape[0]\n", " \n", " stream.write(block_to_write)\n", " \n", " if status.value == 2:\n", " with open(cfg['file_path'], 'a') as f:\n", " f.write(\",\".join([str(x) for x in (t0, selector.value)]) + \"\\n\")\n", "\n", " next_beat += cfg['latency']\n", " \n", " #if stream.write_available > 2:\n", " # stream.write(sounds['silence']) # silence\n", " \n", " else: # idle state\n", " next_beat = time.time() + cfg['latency']\n", " time.sleep(0.005)\n", " \n", " stream.stop()\n", " stream.close()\n", " print('Sound stopped')\n", "\n", " \n", "class ContinuousSoundStream:\n", " \n", " default_cfg = {\n", " 'wav_file': os.path.join('..', 'assets', 'stream1.wav'),\n", " 'chunk_duration': 20,\n", " 'chunk_offset': 2\n", " }\n", " \n", " def __init__(self, cfg):\n", " from scipy.io import wavfile\n", " import sounddevice as sd\n", "\n", " self.cfg = cfg\n", " self.stopped = False\n", " self.samplerate, self.data = wavfile.read(cfg['wav_file'])\n", " self.stream = sd.OutputStream(samplerate=self.samplerate, channels=2, dtype=self.data.dtype)\n", "\n", " def start(self):\n", " self._th = threading.Thread(target=self.update, args=())\n", " self._th.start()\n", "\n", " def stop(self):\n", " self.stopped = True\n", " self._th.join()\n", " print('Continuous sound stream released')\n", " \n", " def update(self):\n", " self.stream.start()\n", " print('Continuous sound stream started at %s Hz' % (self.samplerate))\n", " \n", " offset = int(self.cfg['chunk_offset'] * self.samplerate)\n", " chunk = int(self.cfg['chunk_duration'] * self.samplerate)\n", " \n", " while not self.stopped:\n", " start_idx = offset + np.random.randint(self.data.shape[0] - 2 * offset - chunk)\n", " end_idx = start_idx + chunk\n", " self.stream.write(self.data[start_idx:end_idx])\n", " \n", " self.stream.stop()\n", " self.stream.close()\n", " \n", " \n", "class SoundControllerPR:\n", " \n", " default_cfg = {\n", " \"device\": [1, 26],\n", " \"n_channels\": 10,\n", " \"sounds\": {\n", " \"noise\": {\"amp\": 0.2, \"duration\": 2.0, \"channels\": [6, 8]},\n", " \"target\": {\"freq\": 660, \"amp\": 0.1, \"duration\": 2.0}, \n", " },\n", " \"sample_rate\": 44100,\n", " \"volume\": 0.7,\n", " \"file_path\": \"sounds.csv\"\n", " }\n", " \n", " def __init__(self, status, cfg):\n", " import sounddevice as sd # must be inside the function\n", " import numpy as np\n", " import time\n", "\n", " sd.default.device = cfg['device']\n", " sd.default.samplerate = cfg['sample_rate']\n", " self.stream = sd.OutputStream(samplerate=cfg['sample_rate'], channels=cfg['n_channels'], dtype='float32', blocksize=256)\n", " self.stream.start()\n", "\n", " self.timers = []\n", " self.status = status\n", " self.cfg = cfg\n", " \n", " # noise (not assigned to channels)\n", " filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])\n", " filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])\n", "\n", " noise = np.random.randn(int(cfg['sounds']['noise']['duration'] * cfg['sample_rate']))\n", " noise = lfilter(filter_a, filter_b, noise)\n", " noise = noise / np.abs(noise).max() * cfg['sounds']['noise']['amp']\n", " noise = noise.astype(np.float32)\n", "\n", " # target (not assigned to channels)\n", " sample_rate = cfg['sample_rate']\n", " target_cfg = cfg['sounds']['target']\n", "\n", " tone = SoundController.get_pure_tone(target_cfg['freq'], target_cfg['duration'], sample_rate=cfg['sample_rate'])\n", " tone = tone * SoundController.get_cos_window(tone, target_cfg['window'], sample_rate=cfg['sample_rate'])\n", "\n", " if target_cfg['number'] > 1:\n", " silence = np.zeros( int(target_cfg['iti'] * cfg['sample_rate']) )\n", " tone_with_iti = np.concatenate([tone, silence])\n", " target = np.concatenate([tone_with_iti for i in range(target_cfg['number'] - 1)])\n", " target = np.concatenate([target, tone])\n", " else:\n", " target = tone\n", " \n", " target = target * target_cfg['amp'] # amplitude\n", " \n", " #snd = cfg['sounds']['target']\n", " #target = SoundController.get_pure_tone(snd['freq'], snd['duration'], cfg['sample_rate']) * cfg['volume']\n", " #target = target * SoundController.get_cos_window(target, 0.01, cfg['sample_rate']) # onset / offset\n", " #target = target * snd['amp'] # amplitude\n", " \n", " self.sounds = {'noise': noise, 'target': target}\n", " \n", " def target(self, hd_angle):\n", " to_play = np.zeros((len(self.sounds['target']), self.cfg['n_channels']), dtype='float32')\n", " channel = random.choice(self.cfg['sounds']['target']['channels']) # random speaker!\n", " \n", " to_play[:, channel-1] = self.sounds['target']\n", " \n", " t0 = time.time()\n", " with open(self.cfg['file_path'], 'a') as f:\n", " f.write(\",\".join([str(x) for x in (t0, 2, channel)]) + \"\\n\")\n", " \n", " self.stream.write(to_play)\n", " \n", " def noise(self):\n", " to_play = np.zeros((len(self.sounds['noise']), self.cfg['n_channels']), dtype='float32')\n", " for ch in self.cfg['sounds']['noise']['channels']:\n", " to_play[:, ch-1] = self.sounds['noise']\n", " \n", " ch1 = self.cfg['sounds']['noise']['channels'][0]\n", " t0 = time.time()\n", " with open(self.cfg['file_path'], 'a') as f:\n", " f.write(\",\".join([str(x) for x in (t0, -1, ch1)]) + \"\\n\")\n", " \n", " self.stream.write(to_play)\n", " \n", " def play_non_blocking(self, sound_id, hd_angle=0):\n", " if sound_id == 'target':\n", " tf = threading.Timer(0, self.target, args=[hd_angle])\n", " elif sound_id == 'noise':\n", " tf = threading.Timer(0, self.noise, args=[])\n", " tf.start()\n", " self.timers.append(tf)\n", " \n", " def stop(self):\n", " for t in self.timers:\n", " t.cancel()\n", " self.stream.stop()\n", " self.stream.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sound import SoundController\n", "import numpy as np\n", "\n", "sample_rate = 96000\n", "target_cfg = {\"freq\": 660, \"amp\": 0.1, \"duration\": 0.1, \"window\": 0.005, \"iti\": 0.1, \"number\": 7}\n", "\n", "tone = SoundController.get_pure_tone(target_cfg['freq'], target_cfg['duration'], sample_rate=sample_rate)\n", "tone = tone * SoundController.get_cos_window(tone, target_cfg['window'], sample_rate=sample_rate)\n", "\n", "if target_cfg['number'] > 1:\n", " silence = np.zeros( int(target_cfg['iti'] * sample_rate) )\n", " tone_with_iti = np.concatenate([tone, silence])\n", " pulses = np.concatenate([tone_with_iti for i in range(target_cfg['number'] - 1)])\n", " pulses = np.concatenate([pulses, tone])\n", "else:\n", " pulses = tone\n", "\n", "import matplotlib.pyplot as plt\n", "plt.plot(pulses)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing sound controller for Precedence project" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import time, os\n", "from sound import SoundControllerPR" ] }, { "cell_type": "code", "execution_count": 89, "metadata": {}, "outputs": [], "source": [ "sound_cfg = {\n", " \"device\": [1, 24],\n", " \"n_channels\": 14,\n", " \"sounds\": {\n", " \"noise\": {\"amp\": 0.01, \"duration\": 10.0, \"channels\": [6]}, # 69 DBA (amp 0.01, volume 0.1)\n", " \"target\": {\"freq\": 660, \"amp\": 0.3, \"duration\": 0.015, \"window\": 0.005, \"iti\": 0.1, \"number\": 10, \"channels\": [5]}\n", " },\n", " \"sample_rate\": 44100,\n", " \"volume\": 0.1,\n", " \"file_path\": \"sounds.csv\"\n", "}" ] }, { "cell_type": "code", "execution_count": 87, "metadata": {}, "outputs": [], "source": [ "sc = SoundControllerPR(1, sound_cfg)" ] }, { "cell_type": "code", "execution_count": 88, "metadata": {}, "outputs": [], "source": [ "sc.noise() # 5 is 3\n", "#sc.play_non_blocking('noise', 0)" ] }, { "cell_type": "code", "execution_count": 85, "metadata": {}, "outputs": [], "source": [ "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Building sound stack" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import time, os\n", "from sound import SoundController\n", "\n", "cfg = SoundController.default_cfg\n", "sounds = SoundController.get_tone_stack(cfg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "_ = plt.plot(sounds['noise'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Roving and onset window" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sound import SoundController\n", "\n", "duration = 0.05\n", "freq = 440\n", "\n", "tone = SoundController.get_pure_tone(freq, duration)\n", "tone = tone * SoundController.get_cos_window(tone, 0.01)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "plt.plot(tone)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Building noise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from scipy.signal import lfilter\n", "\n", "sample_rate = 44100\n", "\n", "filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])\n", "filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])\n", "\n", "noise = np.random.randn(3*sample_rate)\n", "noise = lfilter(filter_a, filter_b, noise)\n", "noise = noise / np.abs(noise).max() * 0.5\n", "noise = noise.astype(np.float32)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "plt.plot(noise)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing sound pulses without MP" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import time, os\n", "import multiprocess as mp\n", "from sound import SoundController\n", "\n", "# sound selector: 0 - silence, 1 - tone 1, 2 - tone 2\n", "selector = mp.Value('i', -1)\n", "\n", "# loggin status: 1 - idle, 2 - running, 0 - stopped\n", "status = mp.Value('i', 2)\n", "\n", "cfg = SoundController.default_cfg\n", "#cfg['device'] = [1, 26] # 'M-Audio Delta ASIO'\n", "SoundController.run(selector, status, cfg)\n", "\n", "# nothing happens for a second\n", "time.sleep(1)\n", "\n", "status.value = 2\n", "for i in range(2):\n", " time.sleep(1)\n", " selector.value = -1 if selector.value == 2 else 2\n", "\n", "# stop\n", "status.value = 0\n", "time.sleep(0.2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Testing sound pulses with MP" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import time, os\n", "import multiprocess as mp\n", "from sound import SoundController\n", "\n", "# sound selector: 0 - silence, 1 - tone 1, 2 - tone 2\n", "selector = mp.Value('i', -1)\n", "\n", "# loggin status: 1 - idle, 2 - running, 0 - stopped\n", "status = mp.Value('i', 1)\n", "\n", "cfg = SoundController.default_cfg\n", "cfg['device'] = [1, 26] # 'M-Audio Delta ASIO'\n", "sc = mp.Process(target=SoundController.run, args=(selector, status, cfg))\n", "sc.start()\n", "\n", "# nothing happens for a second\n", "time.sleep(1)\n", "\n", "status.value = 2\n", "for i in range(3):\n", " time.sleep(1)\n", " selector.value = -1 if selector.value == 1 else 1\n", "\n", "# stop\n", "status.value = 0\n", "time.sleep(0.2)\n", "sc.join()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "ds = np.loadtxt('test_sound_log.csv', delimiter=',', skiprows=1)\n", "plt.plot(np.diff(ds[:, 0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sounddevice playground" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sounddevice as sd\n", "[(i, x) for i, x in enumerate(sd.query_devices()) if x['name'].find('ASIO') > 0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sounddevice as sd\n", "import numpy as np\n", "#import keyboard # using module keyboard\n", "\n", "sd.default.device = [1, 24]\n", "sd.default.samplerate = 44100\n", "stream = sd.OutputStream(samplerate=44100, channels=14, dtype='float32')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 3rd channel - left arena speaker\n", "# 1st channel - right arena speaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "duration = 2.5\n", "\n", "x1 = np.linspace(0, duration * 220 * 2*np.pi, int(duration*44100), dtype=np.float32)\n", "x2 = np.linspace(0, duration * 440 * 2*np.pi, int(duration*44100), dtype=np.float32)\n", "y1 = np.sin(x1)\n", "y2 = np.sin(x2)\n", "sil = np.zeros(len(x1), dtype=np.float32)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stream.start()\n", "stream.write(np.column_stack([sil, sil, sil, sil, sil, sil, sil, sil, sil, y1, sil, sil, sil, sil]) * 0.8)\n", "stream.stop()\n", "\n", "# [None, None, 1, 2, 3, 4, 5, 6, 7, 8, None, None, None, None]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stream.start()\n", "\n", "try:\n", " while True: # making a loop\n", " if keyboard.is_pressed('q'): # if key 'q' is pressed \n", " break # finishing the loop\n", "\n", " stream.write(np.column_stack([y1, sil, sil, sil, sil, sil, sil, sil, sil, sil]) * 0.8)\n", " #stream.write(np.column_stack([y2, y2, y2, y2, y2, y2, y2, y2, y2, y2]) * 0.8)\n", " \n", "finally:\n", " stream.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Permanent sound in a separate audio stream" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sounddevice as sd\n", "import numpy as np\n", "import os\n", "import threading\n", "from scipy.io import wavfile" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "sd.query_devices()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wav_fname = os.path.join('..', 'assets', 'stream1.wav')\n", "samplerate, data = wavfile.read(wav_fname)\n", "\n", "print(samplerate, data.shape, data.dtype)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stream = sd.OutputStream(samplerate=samplerate, channels=2, dtype=data.dtype)\n", "\n", "stream.start()\n", "\n", "for i in range(5):\n", " stream.write(data[50000:100000])\n", " print('playing')\n", " \n", "stream.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cfg = ContinuousSoundStream.default_cfg\n", "\n", "cst = ContinuousSoundStream(cfg)\n", "cst.start()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cst.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }