|
- """
- Class for "reading" data from Neuroshare compatible files (check neuroshare.org)
- It runs through the whole file and searches for: analog signals, spike cutouts,
- and trigger events (without duration)
- Depends on: Neuroshare API 0.9.1, numpy 1.6.1, quantities 0.10.1
- Supported: Read
- Author: Andre Maia Chagas
- """
- # note neo.core needs only numpy and quantities
- import numpy as np
- import quantities as pq
- import os
- # check to see if the neuroshare bindings are properly imported
- try:
- import neuroshare as ns
- except ImportError as err:
- print(err)
- # print('\n neuroshare library not found, loading data will not work!' )
- # print('\n be sure to install the library found at:')
- # print('\n www.http://pythonhosted.org/neuroshare/')
- else:
- pass
- # print('neuroshare library successfully imported')
- # import BaseIO
- from neo.io.baseio import BaseIO
- # import objects from neo.core
- from neo.core import Segment, AnalogSignal, SpikeTrain, Event, Epoch
- # create an object based on BaseIO
- class NeuroshareapiIO(BaseIO):
- # setting some class parameters
- is_readable = True # This class can only read data
- is_writable = False # write is not supported
- supported_objects = [Segment, AnalogSignal, SpikeTrain, Event, Epoch]
- has_header = False
- is_streameable = False
- readable_objects = [Segment, AnalogSignal, SpikeTrain, Event, Epoch]
- # This class is not able to write objects
- writeable_objects = []
- # # This is for GUI stuff : a definition for parameters when reading.
- # # This dict should be keyed by object (`Block`). Each entry is a list
- # # of tuple. The first entry in each tuple is the parameter name. The
- # # second entry is a dict with keys 'value' (for default value),
- # # and 'label' (for a descriptive name).
- # # Note that if the highest-level object requires parameters,
- # # common_io_test will be skipped.
- read_params = {
- Segment: [
- ("segment_duration", {"value": 0., "label": "Segment size (s.)"}),
- ("t_start", {"value": 0., "label": "start reading (s.)"}),
- # ("num_analogsignal",
- # {'value" : 8, "label" : "Number of recording points"}),
- # ("num_spiketrain_by_channel',
- # {"value" : 3, "label" : "Num of spiketrains"}),
- ],
- }
- #
- # do not supported write so no GUI stuff
- write_params = None
- name = "Neuroshare"
- extensions = []
- # This object operates on neuroshare files
- mode = "file"
- def __init__(self, filename=None, dllpath=None):
- """
- Arguments:
- filename : the filename
- The init function will run automatically upon calling of the class, as
- in: test = MultichannelIO(filename = filetoberead.mcd), therefore the first
- operations with the file are set here, so that the user doesn't have to
- remember to use another method, than the ones defined in the NEO library
- """
- BaseIO.__init__(self)
- self.filename = filename
- # set the flags for each event type
- eventID = 1
- analogID = 2
- epochID = 3
- # if a filename was given, create a dictionary with information that will
- # be needed later on.
- if self.filename is not None:
- if dllpath is not None:
- name = os.path.splitext(os.path.basename(dllpath))[0]
- library = ns.Library(name, dllpath)
- else:
- library = None
- self.fd = ns.File(self.filename, library=library)
- # get all the metadata from file
- self.metadata = self.fd.metadata_raw
- # get sampling rate
- self.metadata["sampRate"] = 1. / self.metadata["TimeStampResolution"] # hz
- # create lists and array for electrode, spike cutouts and trigger channels
- self.metadata["elecChannels"] = list()
- self.metadata["elecChanId"] = list()
- self.metadata["num_analogs"] = 0
- self.metadata["spkChannels"] = list()
- self.metadata["spkChanId"] = list()
- self.metadata["num_spkChans"] = 0
- self.metadata["triggers"] = list()
- self.metadata["triggersId"] = list()
- self.metadata["num_trigs"] = 0
- self.metadata["digital epochs"] = list()
- self.metadata["digiEpochId"] = list()
- self.metadata["num_digiEpochs"] = 0
- # loop through all entities in file to get the indexes for each entity
- # type, so that one can run through the indexes later, upon reading the
- # segment
- for entity in self.fd.entities:
- # if entity is analog and not the digital line recording
- # (stored as analog in neuroshare files)
- if entity.entity_type == analogID and entity.label[0:4] != "digi":
- # get the electrode number
- self.metadata["elecChannels"].append(entity.label[-4:])
- # get the electrode index
- self.metadata["elecChanId"].append(entity.id)
- # increase the number of electrodes found
- self.metadata["num_analogs"] += 1
- # if the entity is a event entitiy and a trigger
- if entity.entity_type == eventID and entity.label[0:4] == "trig":
- # get the digital bit/trigger number
- self.metadata["triggers"].append(entity.label[0:4] + entity.label[-4:])
- # get the digital bit index
- self.metadata["triggersId"].append(entity.id)
- # increase the number of triggers found
- self.metadata["num_trigs"] += 1
- # if the entity is non triggered digital values with duration
- if entity.entity_type == eventID and entity.label[0:4] == "digi":
- # get the digital bit number
- self.metadata["digital epochs"].append(entity.label[-5:])
- # get the digital bit index
- self.metadata["digiEpochId"].append(entity.id)
- # increase the number of triggers found
- self.metadata["num_digiEpochs"] += 1
- # if the entity is spike cutouts
- if entity.entity_type == epochID and entity.label[0:4] == "spks":
- self.metadata["spkChannels"].append(entity.label[-4:])
- self.metadata["spkChanId"].append(entity.id)
- self.metadata["num_spkChans"] += 1
- # function to create a block and read in a segment
- # def create_block(self,
- #
- # ):
- #
- # blk=Block(name = self.fileName+"_segment:",
- # file_datetime = str(self.metadata_raw["Time_Day"])+"/"+
- # str(self.metadata_raw["Time_Month"])+"/"+
- # str(self.metadata_raw["Time_Year"])+"_"+
- # str(self.metadata_raw["Time_Hour"])+":"+
- # str(self.metadata_raw["Time_Min"]))
- #
- # blk.rec_datetime = blk.file_datetime
- # return blk
- # create function to read segment
- def read_segment(self,
- lazy=False,
- # all following arguments are decided by this IO and are free
- t_start=0.,
- segment_duration=0.,
- ):
- """
- Return a Segment containing all analog and spike channels, as well as
- all trigger events.
- Parameters:
- segment_duration :is the size in secend of the segment.
- num_analogsignal : number of AnalogSignal in this segment
- num_spiketrain : number of SpikeTrain in this segment
- """
- assert not lazy, 'Do not support lazy'
- # if no segment duration is given, use the complete file
- if segment_duration == 0.:
- segment_duration = float(self.metadata["TimeSpan"])
- # if the segment duration is bigger than file, use the complete file
- if segment_duration >= float(self.metadata["TimeSpan"]):
- segment_duration = float(self.metadata["TimeSpan"])
- # if the time sum of start point and segment duration is bigger than
- # the file time span, cap it at the end
- if segment_duration + t_start > float(self.metadata["TimeSpan"]):
- segment_duration = float(self.metadata["TimeSpan"]) - t_start
- # create an empty segment
- seg = Segment(name="segment from the NeuroshareapiIO")
- # read nested analosignal
- if self.metadata["num_analogs"] == 0:
- print("no analog signals in this file!")
- else:
- # run through the number of analog channels found at the __init__ function
- for i in range(self.metadata["num_analogs"]):
- # create an analog signal object for each channel found
- ana = self.read_analogsignal(channel_index=self.metadata["elecChanId"][i],
- segment_duration=segment_duration, t_start=t_start)
- # add analog signal read to segment object
- seg.analogsignals += [ana]
- # read triggers (in this case without any duration)
- for i in range(self.metadata["num_trigs"]):
- # create event object for each trigger/bit found
- eva = self.read_event(channel_index=self.metadata["triggersId"][i],
- segment_duration=segment_duration,
- t_start=t_start, )
- # add event object to segment
- seg.events += [eva]
- # read epochs (digital events with duration)
- for i in range(self.metadata["num_digiEpochs"]):
- # create event object for each trigger/bit found
- epa = self.read_epoch(channel_index=self.metadata["digiEpochId"][i],
- segment_duration=segment_duration,
- t_start=t_start, )
- # add event object to segment
- seg.epochs += [epa]
- # read nested spiketrain
- # run through all spike channels found
- for i in range(self.metadata["num_spkChans"]):
- # create spike object
- sptr = self.read_spiketrain(channel_index=self.metadata["spkChanId"][i],
- segment_duration=segment_duration,
- t_start=t_start)
- # add the spike object to segment
- seg.spiketrains += [sptr]
- seg.create_many_to_one_relationship()
- return seg
- """
- With this IO AnalogSignal can be accessed directly with its channel number
- """
- def read_analogsignal(self,
- lazy=False,
- # channel index as given by the neuroshare API
- channel_index=0,
- # time in seconds to be read
- segment_duration=0.,
- # time in seconds to start reading from
- t_start=0.,
- ):
- assert not lazy, 'Do not support lazy'
- # some controls:
- # if no segment duration is given, use the complete file
- if segment_duration == 0.:
- segment_duration = float(self.metadata["TimeSpan"])
- # if the segment duration is bigger than file, use the complete file
- if segment_duration >= float(self.metadata["TimeSpan"]):
- segment_duration = float(self.metadata["TimeSpan"])
- # get the analog object
- sig = self.fd.get_entity(channel_index)
- # get the units (V, mV etc)
- sigUnits = sig.units
- # get the electrode number
- chanName = sig.label[-4:]
- # transform t_start into index (reading will start from this index)
- startat = int(t_start * self.metadata["sampRate"])
- # get the number of bins to read in
- bins = int(segment_duration * self.metadata["sampRate"])
- # if the number of bins to read is bigger than
- # the total number of bins, read only till the end of analog object
- if startat + bins > sig.item_count:
- bins = sig.item_count - startat
- # read the data from the sig object
- sig, _, _ = sig.get_data(index=startat, count=bins)
- # store it to the 'AnalogSignal' object
- anasig = AnalogSignal(sig, units=sigUnits, sampling_rate=self.metadata["sampRate"] * pq.Hz,
- t_start=t_start * pq.s,
- t_stop=(t_start + segment_duration) * pq.s,
- channel_index=channel_index)
- # annotate from which electrode the signal comes from
- anasig.annotate(info="signal from channel %s" % chanName)
- return anasig
- # function to read spike trains
- def read_spiketrain(self,
- lazy=False,
- channel_index=0,
- segment_duration=0.,
- t_start=0.):
- """
- Function to read in spike trains. This API still does not support read in of
- specific channels as they are recorded. rather the fuunction gets the entity set
- by 'channel_index' which is set in the __init__ function (all spike channels)
- """
- assert not lazy, 'Do not support lazy'
- # sampling rate
- sr = self.metadata["sampRate"]
- # create a list to store spiketrain times
- times = list()
- # get the spike data from a specific channel index
- tempSpks = self.fd.get_entity(channel_index)
- # transform t_start into index (reading will start from this index)
- startat = tempSpks.get_index_by_time(t_start, 0) # zero means closest index to value
- # get the last index to read, using segment duration and t_start
- # -1 means last index before time
- endat = tempSpks.get_index_by_time(float(segment_duration + t_start), -1)
- numIndx = endat - startat
- # get the end point using segment duration
- # create a numpy empty array to store the waveforms
- waveforms = np.array(np.zeros([numIndx, tempSpks.max_sample_count]))
- # loop through the data from the specific channel index
- for i in range(startat, endat, 1):
- # get cutout, timestamp, cutout duration, and spike unit
- tempCuts, timeStamp, duration, unit = tempSpks.get_data(i)
- # save the cutout in the waveform matrix
- waveforms[i] = tempCuts[0]
- # append time stamp to list
- times.append(timeStamp)
- # create a spike train object
- spiketr = SpikeTrain(times, units=pq.s,
- t_stop=t_start + segment_duration,
- t_start=t_start * pq.s,
- name="spikes from electrode" + tempSpks.label[-3:],
- waveforms=waveforms * pq.volt,
- sampling_rate=sr * pq.Hz,
- file_origin=self.filename,
- annotate=("channel_index:" + str(channel_index)))
- return spiketr
- def read_event(self, lazy=False, channel_index=0,
- t_start=0.,
- segment_duration=0.):
- """function to read digital timestamps. this function only reads the event
- onset. to get digital event durations, use the epoch function (to be implemented)."""
- assert not lazy, 'Do not support lazy'
- # create temporary empty lists to store data
- tempNames = list()
- tempTimeStamp = list()
- # get entity from file
- trigEntity = self.fd.get_entity(channel_index)
- # transform t_start into index (reading will start from this index)
- startat = trigEntity.get_index_by_time(t_start, 0) # zero means closest index to value
- # get the last index to read, using segment duration and t_start
- endat = trigEntity.get_index_by_time(
- float(segment_duration + t_start), -1) # -1 means last index before time
- # numIndx = endat-startat
- # run through specified intervals in entity
- for i in range(startat, endat + 1, 1): # trigEntity.item_count):
- # get in which digital bit was the trigger detected
- tempNames.append(trigEntity.label[-8:])
- # get the time stamps of onset events
- tempData, onOrOff = trigEntity.get_data(i)
- # if this was an onset event, save it to the list
- # on triggered recordings it seems that only onset events are
- # recorded. On continuous recordings both onset(==1)
- # and offset(==255) seem to be recorded
- if onOrOff == 1:
- # append the time stamp to them empty list
- tempTimeStamp.append(tempData)
- # create an event array
- eva = Event(labels=np.array(tempNames, dtype="U"),
- times=np.array(tempTimeStamp) * pq.s,
- file_origin=self.filename,
- description="the trigger events (without durations)")
- return eva
- def read_epoch(self, lazy=False,
- channel_index=0,
- t_start=0.,
- segment_duration=0.):
- """function to read digital timestamps. this function reads the event
- onset and offset and outputs onset and duration. to get only onsets use
- the event array function"""
- assert not lazy, 'Do not support lazy'
- # create temporary empty lists to store data
- tempNames = list()
- tempTimeStamp = list()
- durations = list()
- # get entity from file
- digEntity = self.fd.get_entity(channel_index)
- # transform t_start into index (reading will start from this index)
- startat = digEntity.get_index_by_time(t_start, 0) # zero means closest index to value
- # get the last index to read, using segment duration and t_start
- # -1 means last index before time
- endat = digEntity.get_index_by_time(float(segment_duration + t_start), -1)
- # run through entity using only odd "i"s
- for i in range(startat, endat + 1, 1):
- if i % 2 == 1:
- # get in which digital bit was the trigger detected
- tempNames.append(digEntity.label[-8:])
- # get the time stamps of even events
- tempData, onOrOff = digEntity.get_data(i - 1)
- # if this was an onset event, save it to the list
- # on triggered recordings it seems that only onset events are
- # recorded. On continuous recordings both onset(==1)
- # and offset(==255) seem to be recorded
- # if onOrOff == 1:
- # append the time stamp to them empty list
- tempTimeStamp.append(tempData)
- # get time stamps of odd events
- tempData1, onOrOff = digEntity.get_data(i)
- # if onOrOff == 255:
- # pass
- durations.append(tempData1 - tempData)
- epa = Epoch(file_origin=self.filename,
- times=np.array(tempTimeStamp) * pq.s,
- durations=np.array(durations) * pq.s,
- labels=np.array(tempNames, dtype="U"),
- description="digital events with duration")
- return epa
|