Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

sound.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import numpy as np
  2. import time
  3. from scipy.signal import lfilter
  4. class SoundController:
  5. # https://python-sounddevice.readthedocs.io/en/0.3.15/api/streams.html#sounddevice.OutputStream
  6. default_cfg = {
  7. "device": [1, 26],
  8. "n_channels": 4,
  9. "sounds": [
  10. {"freq": 460, "amp": 0.13, "channels": [1, 3]},
  11. {"freq": 660, "amp": 0.05, "channels": [1, 3]},
  12. {"freq": 860, "amp": 0.15, "channels": [1, 3]},
  13. {"freq": 1060, "amp": 0.25, "channels": [1, 3]},
  14. {"freq": 1320, "amp": 0.2, "channels": [1, 3]},
  15. {"freq": 20000, "amp": 0.55, "channels": [1, 3]}
  16. ],
  17. "pulse_duration": 0.05,
  18. "sample_rate": 44100,
  19. "latency": 0.25,
  20. "volume": 0.7,
  21. "roving": 5.0,
  22. "file_path": "sounds.csv"
  23. }
  24. @classmethod
  25. def get_pure_tone(cls, freq, duration, sample_rate=44100):
  26. x = np.linspace(0, duration * freq * 2*np.pi, int(duration*sample_rate), dtype=np.float32)
  27. return np.sin(x)
  28. @classmethod
  29. def get_cos_window(cls, tone, win_duration, sample_rate=44100):
  30. x = np.linspace(0, np.pi/2, int(win_duration * sample_rate), dtype=np.float32)
  31. onset = np.sin(x)
  32. middle = np.ones(len(tone) - 2 * len(x))
  33. offset = np.cos(x)
  34. return np.concatenate([onset, middle, offset])
  35. @classmethod
  36. def get_tone_stack(cls, cfg):
  37. # silence
  38. silence = np.zeros(2, dtype='float32')
  39. sounds = {0: np.column_stack([silence for x in range(cfg['n_channels'])])}
  40. # noise
  41. filter_a = np.array([0.0075, 0.0225, 0.0225, 0.0075])
  42. filter_b = np.array([1.0000,-2.1114, 1.5768,-0.4053])
  43. noise = np.random.randn(int(0.25 * cfg['sample_rate'])) # 250ms of noise
  44. noise = lfilter(filter_a, filter_b, noise)
  45. noise = noise / np.abs(noise).max() * 0.5
  46. noise = noise.astype(np.float32)
  47. sounds[-1] = np.column_stack([noise for x in range(cfg['n_channels'])])
  48. # all other sounds
  49. for i, snd in enumerate(cfg['sounds']):
  50. tone = cls.get_pure_tone(snd['freq'], cfg['pulse_duration'], cfg['sample_rate']) * cfg['volume']
  51. tone = tone * cls.get_cos_window(tone, 0.01, cfg['sample_rate']) # onset / offset
  52. tone = tone * snd['amp']
  53. sound = np.zeros([len(tone), cfg['n_channels']], dtype='float32')
  54. for j in snd['channels']:
  55. sound[:, j-1] = tone
  56. sounds[i + 1] = sound
  57. #sounds[i + 1] = np.column_stack((nothing, tone, tone, tone))
  58. return sounds
  59. @classmethod
  60. def run(cls, selector, status, cfg):
  61. """
  62. selector mp.Value object to set the sound to be played
  63. status mp.Value object to stop the loop
  64. """
  65. import sounddevice as sd # must be inside the function
  66. import numpy as np
  67. import time
  68. sounds = cls.get_tone_stack(cfg)
  69. sd.default.device = cfg['device']
  70. sd.default.samplerate = cfg['sample_rate']
  71. stream = sd.OutputStream(samplerate=cfg['sample_rate'], channels=4, dtype='float32', blocksize=256)
  72. stream.start()
  73. next_beat = time.time() + cfg['latency']
  74. with open(cfg['file_path'], 'w') as f:
  75. f.write("time,id\n")
  76. while status.value > 0:
  77. if status.value == 2: # running state
  78. t0 = time.time()
  79. if t0 < next_beat:
  80. #time.sleep(0.0001) # not to spin the wheels too much
  81. if stream.write_available > 2:
  82. stream.write(sounds[0]) # silence
  83. continue
  84. roving = 10**((np.random.rand() * cfg['roving'] - cfg['roving']/2.0)/20.)
  85. roving = roving if int(selector.value) > -1 else 1 # no roving for noise
  86. stream.write(sounds[int(selector.value)] * roving)
  87. with open(cfg['file_path'], 'a') as f:
  88. f.write(",".join([str(x) for x in (t0, selector.value)]) + "\n")
  89. next_beat += cfg['latency']
  90. if stream.write_available > 2:
  91. stream.write(sounds[0]) # silence
  92. else: # idle state
  93. next_beat = time.time() + cfg['latency']
  94. time.sleep(0.05)
  95. stream.stop()
  96. print('Sound stopped')