Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

exampleio.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for "reading" fake data from an imaginary file.
  4. For the user, it generates a :class:`Segment` or a :class:`Block` with a
  5. sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
  6. :class:`Event`.
  7. For a developer, it is just an example showing guidelines for someone who wants
  8. to develop a new IO module.
  9. Depends on: scipy
  10. Supported: Read
  11. Author: sgarcia
  12. """
  13. # needed for python 3 compatibility
  14. from __future__ import absolute_import
  15. # note neo.core needs only numpy and quantities
  16. import numpy as np
  17. import quantities as pq
  18. # but my specific IO can depend on many other packages
  19. try:
  20. from scipy import stats
  21. except ImportError as err:
  22. HAVE_SCIPY = False
  23. SCIPY_ERR = err
  24. else:
  25. HAVE_SCIPY = True
  26. SCIPY_ERR = None
  27. # I need to subclass BaseIO
  28. from neo.io.baseio import BaseIO
  29. # to import from core
  30. from neo.core import Segment, AnalogSignal, SpikeTrain, Event
  31. # I need to subclass BaseIO
  32. class ExampleIO(BaseIO):
  33. """
  34. Class for "reading" fake data from an imaginary file.
  35. For the user, it generates a :class:`Segment` or a :class:`Block` with a
  36. sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
  37. :class:`Event`.
  38. For a developer, it is just an example showing guidelines for someone who wants
  39. to develop a new IO module.
  40. Two rules for developers:
  41. * Respect the Neo IO API (:ref:`neo_io_API`)
  42. * Follow :ref:`io_guiline`
  43. Usage:
  44. >>> from neo import io
  45. >>> r = io.ExampleIO(filename='itisafake.nof')
  46. >>> seg = r.read_segment(lazy=False, cascade=True)
  47. >>> print(seg.analogsignals) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  48. [<AnalogSignal(array([ 0.19151945, 0.62399373, 0.44149764, ..., 0.96678374,
  49. ...
  50. >>> print(seg.spiketrains) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  51. [<SpikeTrain(array([ -0.83799524, 6.24017951, 7.76366686, 4.45573701,
  52. 12.60644415, 10.68328994, 8.07765735, 4.89967804,
  53. ...
  54. >>> print(seg.events) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  55. [<Event: TriggerB@9.6976 s, TriggerA@10.2612 s, TriggerB@2.2777 s, TriggerA@6.8607 s, ...
  56. >>> anasig = r.read_analogsignal(lazy=True, cascade=False)
  57. >>> print(anasig._data_description)
  58. {'shape': (150000,)}
  59. >>> anasig = r.read_analogsignal(lazy=False, cascade=False)
  60. """
  61. is_readable = True # This class can only read data
  62. is_writable = False # write is not supported
  63. # This class is able to directly or indirectly handle the following objects
  64. # You can notice that this greatly simplifies the full Neo object hierarchy
  65. supported_objects = [ Segment , AnalogSignal, SpikeTrain, Event ]
  66. # This class can return either a Block or a Segment
  67. # The first one is the default ( self.read )
  68. # These lists should go from highest object to lowest object because
  69. # common_io_test assumes it.
  70. readable_objects = [ Segment , AnalogSignal, SpikeTrain ]
  71. # This class is not able to write objects
  72. writeable_objects = [ ]
  73. has_header = False
  74. is_streameable = False
  75. # This is for GUI stuff : a definition for parameters when reading.
  76. # This dict should be keyed by object (`Block`). Each entry is a list
  77. # of tuple. The first entry in each tuple is the parameter name. The
  78. # second entry is a dict with keys 'value' (for default value),
  79. # and 'label' (for a descriptive name).
  80. # Note that if the highest-level object requires parameters,
  81. # common_io_test will be skipped.
  82. read_params = {
  83. Segment : [
  84. ('segment_duration',
  85. {'value' : 15., 'label' : 'Segment size (s.)'}),
  86. ('num_analogsignal',
  87. {'value' : 8, 'label' : 'Number of recording points'}),
  88. ('num_spiketrain_by_channel',
  89. {'value' : 3, 'label' : 'Num of spiketrains'}),
  90. ],
  91. }
  92. # do not supported write so no GUI stuff
  93. write_params = None
  94. name = 'example'
  95. extensions = [ 'nof' ]
  96. # mode can be 'file' or 'dir' or 'fake' or 'database'
  97. # the main case is 'file' but some reader are base on a directory or a database
  98. # this info is for GUI stuff also
  99. mode = 'fake'
  100. def __init__(self , filename = None) :
  101. """
  102. Arguments:
  103. filename : the filename
  104. Note:
  105. - filename is here just for exampe because it will not be take in account
  106. - if mode=='dir' the argument should be dirname (See TdtIO)
  107. """
  108. BaseIO.__init__(self)
  109. self.filename = filename
  110. # Seed so all instances can return the same values
  111. np.random.seed(1234)
  112. # Segment reading is supported so I define this :
  113. def read_segment(self,
  114. # the 2 first keyword arguments are imposed by neo.io API
  115. lazy = False,
  116. cascade = True,
  117. # all following arguments are decied by this IO and are free
  118. segment_duration = 15.,
  119. num_analogsignal = 4,
  120. num_spiketrain_by_channel = 3,
  121. ):
  122. """
  123. Return a fake Segment.
  124. The self.filename does not matter.
  125. In this IO read by default a Segment.
  126. This is just a example to be adapted to each ClassIO.
  127. In this case these 3 paramters are taken in account because this function
  128. return a generated segment with fake AnalogSignal and fake SpikeTrain.
  129. Parameters:
  130. segment_duration :is the size in secend of the segment.
  131. num_analogsignal : number of AnalogSignal in this segment
  132. num_spiketrain : number of SpikeTrain in this segment
  133. """
  134. sampling_rate = 10000. #Hz
  135. t_start = -1.
  136. #time vector for generated signal
  137. timevect = np.arange(t_start, t_start+ segment_duration , 1./sampling_rate)
  138. # create an empty segment
  139. seg = Segment( name = 'it is a seg from exampleio')
  140. if cascade:
  141. # read nested analosignal
  142. for i in range(num_analogsignal):
  143. ana = self.read_analogsignal( lazy = lazy , cascade = cascade ,
  144. channel_index = i ,segment_duration = segment_duration, t_start = t_start)
  145. seg.analogsignals += [ ana ]
  146. # read nested spiketrain
  147. for i in range(num_analogsignal):
  148. for _ in range(num_spiketrain_by_channel):
  149. sptr = self.read_spiketrain(lazy = lazy , cascade = cascade ,
  150. segment_duration = segment_duration, t_start = t_start , channel_index = i)
  151. seg.spiketrains += [ sptr ]
  152. # create an Event that mimic triggers.
  153. # note that ExampleIO do not allow to acess directly to Event
  154. # for that you need read_segment(cascade = True)
  155. if lazy:
  156. # in lazy case no data are readed
  157. # eva is empty
  158. eva = Event()
  159. else:
  160. # otherwise it really contain data
  161. n = 1000
  162. # neo.io support quantities my vector use second for unit
  163. eva = Event(timevect[(np.random.rand(n)*timevect.size).astype('i')]* pq.s)
  164. # all duration are the same
  165. eva.durations = np.ones(n)*500*pq.ms # Event doesn't have durations. Is Epoch intended here?
  166. # label
  167. l = [ ]
  168. for i in range(n):
  169. if np.random.rand()>.6: l.append( 'TriggerA' )
  170. else : l.append( 'TriggerB' )
  171. eva.labels = np.array( l )
  172. seg.events += [ eva ]
  173. seg.create_many_to_one_relationship()
  174. return seg
  175. def read_analogsignal(self ,
  176. # the 2 first key arguments are imposed by neo.io API
  177. lazy = False,
  178. cascade = True,
  179. channel_index = 0,
  180. segment_duration = 15.,
  181. t_start = -1,
  182. ):
  183. """
  184. With this IO AnalogSignal can e acces directly with its channel number
  185. """
  186. sr = 10000.
  187. sinus_freq = 3. # Hz
  188. #time vector for generated signal:
  189. tvect = np.arange(t_start, t_start+ segment_duration , 1./sr)
  190. if lazy:
  191. anasig = AnalogSignal([], units='V', sampling_rate=sr * pq.Hz,
  192. t_start=t_start * pq.s,
  193. channel_index=channel_index)
  194. # we add the attribute lazy_shape with the size if loaded
  195. anasig.lazy_shape = tvect.shape
  196. else:
  197. # create analogsignal (sinus of 3 Hz)
  198. sig = np.sin(2*np.pi*tvect*sinus_freq + channel_index/5.*2*np.pi)+np.random.rand(tvect.size)
  199. anasig = AnalogSignal(sig, units= 'V', sampling_rate=sr * pq.Hz,
  200. t_start=t_start * pq.s,
  201. channel_index=channel_index)
  202. # for attributes out of neo you can annotate
  203. anasig.annotate(info = 'it is a sinus of %f Hz' %sinus_freq )
  204. return anasig
  205. def read_spiketrain(self ,
  206. # the 2 first key arguments are imposed by neo.io API
  207. lazy = False,
  208. cascade = True,
  209. segment_duration = 15.,
  210. t_start = -1,
  211. channel_index = 0,
  212. ):
  213. """
  214. With this IO SpikeTrain can e acces directly with its channel number
  215. """
  216. # There are 2 possibles behaviour for a SpikeTrain
  217. # holding many Spike instance or directly holding spike times
  218. # we choose here the first :
  219. if not HAVE_SCIPY:
  220. raise SCIPY_ERR
  221. num_spike_by_spiketrain = 40
  222. sr = 10000.
  223. if lazy:
  224. times = [ ]
  225. else:
  226. times = (np.random.rand(num_spike_by_spiketrain)*segment_duration +
  227. t_start)
  228. # create a spiketrain
  229. spiketr = SpikeTrain(times, t_start = t_start*pq.s, t_stop = (t_start+segment_duration)*pq.s ,
  230. units = pq.s,
  231. name = 'it is a spiketrain from exampleio',
  232. )
  233. if lazy:
  234. # we add the attribute lazy_shape with the size if loaded
  235. spiketr.lazy_shape = (num_spike_by_spiketrain,)
  236. # ours spiketrains also hold the waveforms:
  237. # 1 generate a fake spike shape (2d array if trodness >1)
  238. w1 = -stats.nct.pdf(np.arange(11,60,4), 5,20)[::-1]/3.
  239. w2 = stats.nct.pdf(np.arange(11,60,2), 5,20)
  240. w = np.r_[ w1 , w2 ]
  241. w = -w/max(w)
  242. if not lazy:
  243. # in the neo API the waveforms attr is 3 D in case tetrode
  244. # in our case it is mono electrode so dim 1 is size 1
  245. waveforms = np.tile( w[np.newaxis,np.newaxis,:], ( num_spike_by_spiketrain ,1, 1) )
  246. waveforms *= np.random.randn(*waveforms.shape)/6+1
  247. spiketr.waveforms = waveforms*pq.mV
  248. spiketr.sampling_rate = sr * pq.Hz
  249. spiketr.left_sweep = 1.5* pq.s
  250. # for attributes out of neo you can annotate
  251. spiketr.annotate(channel_index = channel_index)
  252. return spiketr