123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # -*- coding: utf-8 -*-
- """
- Class for "reading" fake data from an imaginary file.
- For the user, it generates a :class:`Segment` or a :class:`Block` with a
- sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
- :class:`Event`.
- For a developer, it is just an example showing guidelines for someone who wants
- to develop a new IO module.
- Depends on: scipy
- Supported: Read
- Author: sgarcia
- """
- # needed for python 3 compatibility
- from __future__ import absolute_import
- # note neo.core needs only numpy and quantities
- import numpy as np
- import quantities as pq
- # but my specific IO can depend on many other packages
- try:
- from scipy import stats
- except ImportError as err:
- HAVE_SCIPY = False
- SCIPY_ERR = err
- else:
- HAVE_SCIPY = True
- SCIPY_ERR = None
- # I need to subclass BaseIO
- from neo.io.baseio import BaseIO
- # to import from core
- from neo.core import Segment, AnalogSignal, SpikeTrain, Event
- # I need to subclass BaseIO
- class ExampleIO(BaseIO):
- """
- Class for "reading" fake data from an imaginary file.
- For the user, it generates a :class:`Segment` or a :class:`Block` with a
- sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
- :class:`Event`.
- For a developer, it is just an example showing guidelines for someone who wants
- to develop a new IO module.
- Two rules for developers:
- * Respect the Neo IO API (:ref:`neo_io_API`)
- * Follow :ref:`io_guiline`
- Usage:
- >>> from neo import io
- >>> r = io.ExampleIO(filename='itisafake.nof')
- >>> seg = r.read_segment(lazy=False, cascade=True)
- >>> print(seg.analogsignals) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- [<AnalogSignal(array([ 0.19151945, 0.62399373, 0.44149764, ..., 0.96678374,
- ...
- >>> print(seg.spiketrains) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- [<SpikeTrain(array([ -0.83799524, 6.24017951, 7.76366686, 4.45573701,
- 12.60644415, 10.68328994, 8.07765735, 4.89967804,
- ...
- >>> print(seg.events) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
- [<Event: TriggerB@9.6976 s, TriggerA@10.2612 s, TriggerB@2.2777 s, TriggerA@6.8607 s, ...
- >>> anasig = r.read_analogsignal(lazy=True, cascade=False)
- >>> print(anasig._data_description)
- {'shape': (150000,)}
- >>> anasig = r.read_analogsignal(lazy=False, cascade=False)
- """
- is_readable = True # This class can only read data
- is_writable = False # write is not supported
- # This class is able to directly or indirectly handle the following objects
- # You can notice that this greatly simplifies the full Neo object hierarchy
- supported_objects = [ Segment , AnalogSignal, SpikeTrain, Event ]
- # This class can return either a Block or a Segment
- # The first one is the default ( self.read )
- # These lists should go from highest object to lowest object because
- # common_io_test assumes it.
- readable_objects = [ Segment , AnalogSignal, SpikeTrain ]
- # This class is not able to write objects
- writeable_objects = [ ]
- has_header = False
- is_streameable = False
- # This is for GUI stuff : a definition for parameters when reading.
- # This dict should be keyed by object (`Block`). Each entry is a list
- # of tuple. The first entry in each tuple is the parameter name. The
- # second entry is a dict with keys 'value' (for default value),
- # and 'label' (for a descriptive name).
- # Note that if the highest-level object requires parameters,
- # common_io_test will be skipped.
- read_params = {
- Segment : [
- ('segment_duration',
- {'value' : 15., 'label' : 'Segment size (s.)'}),
- ('num_analogsignal',
- {'value' : 8, 'label' : 'Number of recording points'}),
- ('num_spiketrain_by_channel',
- {'value' : 3, 'label' : 'Num of spiketrains'}),
- ],
- }
- # do not supported write so no GUI stuff
- write_params = None
- name = 'example'
- extensions = [ 'nof' ]
- # mode can be 'file' or 'dir' or 'fake' or 'database'
- # the main case is 'file' but some reader are base on a directory or a database
- # this info is for GUI stuff also
- mode = 'fake'
- def __init__(self , filename = None) :
- """
- Arguments:
- filename : the filename
- Note:
- - filename is here just for exampe because it will not be take in account
- - if mode=='dir' the argument should be dirname (See TdtIO)
- """
- BaseIO.__init__(self)
- self.filename = filename
- # Seed so all instances can return the same values
- np.random.seed(1234)
- # Segment reading is supported so I define this :
- def read_segment(self,
- # the 2 first keyword arguments are imposed by neo.io API
- lazy = False,
- cascade = True,
- # all following arguments are decied by this IO and are free
- segment_duration = 15.,
- num_analogsignal = 4,
- num_spiketrain_by_channel = 3,
- ):
- """
- Return a fake Segment.
- The self.filename does not matter.
- In this IO read by default a Segment.
- This is just a example to be adapted to each ClassIO.
- In this case these 3 paramters are taken in account because this function
- return a generated segment with fake AnalogSignal and fake SpikeTrain.
- Parameters:
- segment_duration :is the size in secend of the segment.
- num_analogsignal : number of AnalogSignal in this segment
- num_spiketrain : number of SpikeTrain in this segment
- """
- sampling_rate = 10000. #Hz
- t_start = -1.
- #time vector for generated signal
- timevect = np.arange(t_start, t_start+ segment_duration , 1./sampling_rate)
- # create an empty segment
- seg = Segment( name = 'it is a seg from exampleio')
- if cascade:
- # read nested analosignal
- for i in range(num_analogsignal):
- ana = self.read_analogsignal( lazy = lazy , cascade = cascade ,
- channel_index = i ,segment_duration = segment_duration, t_start = t_start)
- seg.analogsignals += [ ana ]
- # read nested spiketrain
- for i in range(num_analogsignal):
- for _ in range(num_spiketrain_by_channel):
- sptr = self.read_spiketrain(lazy = lazy , cascade = cascade ,
- segment_duration = segment_duration, t_start = t_start , channel_index = i)
- seg.spiketrains += [ sptr ]
- # create an Event that mimic triggers.
- # note that ExampleIO do not allow to acess directly to Event
- # for that you need read_segment(cascade = True)
- if lazy:
- # in lazy case no data are readed
- # eva is empty
- eva = Event()
- else:
- # otherwise it really contain data
- n = 1000
- # neo.io support quantities my vector use second for unit
- eva = Event(timevect[(np.random.rand(n)*timevect.size).astype('i')]* pq.s)
- # all duration are the same
- eva.durations = np.ones(n)*500*pq.ms # Event doesn't have durations. Is Epoch intended here?
- # label
- l = [ ]
- for i in range(n):
- if np.random.rand()>.6: l.append( 'TriggerA' )
- else : l.append( 'TriggerB' )
- eva.labels = np.array( l )
- seg.events += [ eva ]
- seg.create_many_to_one_relationship()
- return seg
- def read_analogsignal(self ,
- # the 2 first key arguments are imposed by neo.io API
- lazy = False,
- cascade = True,
- channel_index = 0,
- segment_duration = 15.,
- t_start = -1,
- ):
- """
- With this IO AnalogSignal can e acces directly with its channel number
- """
- sr = 10000.
- sinus_freq = 3. # Hz
- #time vector for generated signal:
- tvect = np.arange(t_start, t_start+ segment_duration , 1./sr)
- if lazy:
- anasig = AnalogSignal([], units='V', sampling_rate=sr * pq.Hz,
- t_start=t_start * pq.s,
- channel_index=channel_index)
- # we add the attribute lazy_shape with the size if loaded
- anasig.lazy_shape = tvect.shape
- else:
- # create analogsignal (sinus of 3 Hz)
- sig = np.sin(2*np.pi*tvect*sinus_freq + channel_index/5.*2*np.pi)+np.random.rand(tvect.size)
- anasig = AnalogSignal(sig, units= 'V', sampling_rate=sr * pq.Hz,
- t_start=t_start * pq.s,
- channel_index=channel_index)
- # for attributes out of neo you can annotate
- anasig.annotate(info = 'it is a sinus of %f Hz' %sinus_freq )
- return anasig
- def read_spiketrain(self ,
- # the 2 first key arguments are imposed by neo.io API
- lazy = False,
- cascade = True,
- segment_duration = 15.,
- t_start = -1,
- channel_index = 0,
- ):
- """
- With this IO SpikeTrain can e acces directly with its channel number
- """
- # There are 2 possibles behaviour for a SpikeTrain
- # holding many Spike instance or directly holding spike times
- # we choose here the first :
- if not HAVE_SCIPY:
- raise SCIPY_ERR
- num_spike_by_spiketrain = 40
- sr = 10000.
- if lazy:
- times = [ ]
- else:
- times = (np.random.rand(num_spike_by_spiketrain)*segment_duration +
- t_start)
- # create a spiketrain
- spiketr = SpikeTrain(times, t_start = t_start*pq.s, t_stop = (t_start+segment_duration)*pq.s ,
- units = pq.s,
- name = 'it is a spiketrain from exampleio',
- )
- if lazy:
- # we add the attribute lazy_shape with the size if loaded
- spiketr.lazy_shape = (num_spike_by_spiketrain,)
- # ours spiketrains also hold the waveforms:
- # 1 generate a fake spike shape (2d array if trodness >1)
- w1 = -stats.nct.pdf(np.arange(11,60,4), 5,20)[::-1]/3.
- w2 = stats.nct.pdf(np.arange(11,60,2), 5,20)
- w = np.r_[ w1 , w2 ]
- w = -w/max(w)
- if not lazy:
- # in the neo API the waveforms attr is 3 D in case tetrode
- # in our case it is mono electrode so dim 1 is size 1
- waveforms = np.tile( w[np.newaxis,np.newaxis,:], ( num_spike_by_spiketrain ,1, 1) )
- waveforms *= np.random.randn(*waveforms.shape)/6+1
- spiketr.waveforms = waveforms*pq.mV
- spiketr.sampling_rate = sr * pq.Hz
- spiketr.left_sweep = 1.5* pq.s
- # for attributes out of neo you can annotate
- spiketr.annotate(channel_index = channel_index)
- return spiketr
|