123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import logging
- import os
- import sys
- import time
- import cerebus.cbpy as cb
- import numpy as np
- from aux import log
- class Daq:
- """
- Description: This class contains all methods and variables related to
- communication with the NSP.
- """
- def __init__(self):
- """
- Opens the connection to the NSP and returns error if NSP is offline.
- Writes the connection parameters to the params
- (see Data class for info on params).
- """
- log.info("Initializing Daq class")
- log.info("Opening cbpy connection to NSP...")
- # Try opening a connection
- res = cb.open()
- if res[0] < 0:
- log.critical("failed to open cbpy connection with NSP")
- time.sleep(1)
- # If success, return result and add connection parameters to params
- if res[0] >= 0:
- log.info("Success! NSP Initialized")
- # Save the connection parameters to params
- self.instance = res[0]
- self.connection = res[1]['connection']
- self.instrument = res[1]['instrument']
- elif res[0] < 0:
- log.warning("Could not open cbpy connection to NSP... retrying")
- self.close_connection()
- time.sleep(0.5)
- res = cb.open()
- if res[0] < 0:
- log.critical("NSP not online / misbehaving. Please check if reboot NSP.")
- sys.exit(1)
- return None
- def close_connection(self):
- """
- Close the connection to the NSP. Stop running BCI.
- """
- res_close = cb.close()
- if res_close >= 0:
- log.info("cbpy connection to NSP closed.")
- return None
- def test_channel_config(self, channels=range(1, 97), group=5):
- """Check if NSP is broadcasting the queried channels at the right srate"""
- res = cb.trial_config(reset=True, instance=self.instance)
- time.sleep(0.2)
- # data = cb.trial_continuous()
- # nChansBroadcast = len(data[1])
- # print(nChansBroadcast)
- extraChans = np.setdiff1d(list(range(1, 97)), list(channels))
- if len(extraChans) > 0:
- log.info("Checking unconfigured channels")
- for channel in extraChans:
- c = cb.get_channel_config(channel)
- if c[1]['smpgroup'] != 0:
- res = cb.set_channel_config(channel, chaninfo={'smpgroup':0})
- if res == 0:
- log.info("turned of channel {}".format(channel))
- time.sleep(0.05)
- count = self.validate_channels_srate(channels, group)
- if count > 0:
- # Configure the channels and start recording
- log.info("Assigning channels: {} to group: {}".format(list(channels), group))
- self.configure_channels(channels, group)
- # time.sleep(0.2)
- count = self.validate_channels_srate(channels, group)
- if count > 0:
- log.error("Failed to configure {} channels. Restart NSP.".format(count))
- res = self.close_connection()
- return None
- def validate_channels_srate(self, channels=range(1, 97), group=5):
- cb.trial_config()
- time.sleep(0.2)
- data = cb.trial_continuous(reset=True)
- """ Check if channels have been configured"""
- count = 0
- for channel in channels:
- time.sleep(0.2)
- # assert data[1][channel - 1][2] == 30000, 'channel no %r not configured correctly' % channel
- c = cb.get_channel_config(channel, instance=self.instance)
- if c[1]['smpgroup'] != group:
- count += 1
- log.warning("Channel {} is in {} instead of {}".format(count, c[1]['smpgroup'], group))
- if count == 0:
- log.info("{} channels are in group {}".format(len(list(channels)), group))
- else:
- log.info("{} channels are not in group ".format(count, group))
- # print("Please restart nPlayServer to reset channels")
- return count
- def configure_channels(self, channels=list(range(1, 97)), group=5):
- """
- Configures 96 channels at 30k
- inputs:
- channels: List of channels
- Default: [1:96]
- sampling_group: The sampling group to assign to the channel.
- They are 1 to 5 and defined as follows:
- 1 = 0.5Khz, 2 = 1Khz, 3 = 2Khz, 4 = 10Khz, 5 =
- Example:
- To set channels 5, 6 and 7 use at 30k use:
- >>> channels = [5, 6, 7]
- >>> daq.configure_channels(channels, 5)
- Currently, turning channels off is not supported.
- """
- for channel in channels:
- res = cb.set_channel_config(channel, chaninfo={'smpgroup': group}, instance=self.instance)
- time.sleep(0.2)
- if res != 0:
- log.warning("failed for channel {}".format(channel))
- elif res == 0:
- log.info("Assigned ch{} to group {}".format(channel, group))
- return None
- if __name__ == '__main__':
- daq = Daq()
- time.sleep(0.5)
- res = daq.test_channel_config(range(1, 11), group=5)
|