Scheduled service maintenance on November 22


On Friday, November 22, 2024, between 06:00 CET and 18:00 CET, GIN services will undergo planned maintenance. Extended service interruptions should be expected. We will try to keep downtimes to a minimum, but recommend that users avoid critical tasks, large data uploads, or DOI requests during this time.

We apologize for any inconvenience.

stim_sequence.py 9.2 KB


  1. # -*- coding: utf-8 -*-
  2. import struct
  3. from array import array
  4. import pylab
  5. import os
  6. import numpy as np
  7. import logging
  8. logger = logging.getLogger(__name__)
  9. class FileTypeError(Exception):
  10. pass
  11. class StimSequenceFileHeader(object):
  12. def __init__(self,
  13. file_type_string,
  14. file_version_string,
  15. stim_bytes,
  16. sequence_length,
  17. min_stim,
  18. max_stim,
  19. ):
  20. self.type_string = file_type_string
  21. self.version_string = file_version_string
  22. self.stim_bytes = stim_bytes
  23. self.sequence_length = sequence_length
  24. self.min_stim = min_stim
  25. self.max_stim = max_stim
  26. def __repr__(self):
  27. display_attrs = ['type_string',
  28. 'version_string',
  29. 'stim_bytes',
  30. 'sequence_length',
  31. 'min_stim',
  32. 'max_stim']
  33. return "<{} {}\n>".format(self.__class__.__name__,
  34. ' '.join(["\n {}={}".format(p, getattr(self, p)) for p in display_attrs]))
  35. @classmethod
  36. def from_file_object(cls, fobj):
  37. (file_type_length,) = struct.unpack('i', fobj.read(4))
  38. if file_type_length != 16:
  39. raise FileTypeError
  40. file_type_string = fobj.read(file_type_length)
  41. if file_type_string != "StimulusSequence":
  42. raise FileTypeError
  43. (file_version_length,) = struct.unpack('i', fobj.read(4))
  44. if file_version_length != 4:
  45. raise FileTypeError
  46. file_version_string = fobj.read(file_version_length)
  47. if file_version_string != "1.00":
  48. raise FileTypeError
  49. (stim_bytes,) = struct.unpack('c', fobj.read(1))
  50. stim_bytes = ord(stim_bytes)
  51. (sequence_length,) = struct.unpack('i', fobj.read(4))
  52. (min_stim,) = struct.unpack('i', fobj.read(4))
  53. (max_stim,) = struct.unpack('i', fobj.read(4))
  54. file_header = cls(file_type_string,
  55. file_version_string,
  56. stim_bytes,
  57. sequence_length,
  58. min_stim,
  59. max_stim)
  60. logger.debug(str(file_header))
  61. return file_header
  62. class StimulusSequence(object):
  63. """Represents a sequence of stimulus numbers for usage in a NEST or objsim simulation.
  64. """
  65. def __init__(self, sequence=[], file_path=None):
  66. self.nparray = np.array(sequence)
  67. if file_path is not None:
  68. self.fromfile(file_path)
  69. def tofile(self, file_name):
  70. """Schreibe Sequenz-Datei.
  71. Wird eingelesen in ObjSim mit:
  72. bool StimulusSequence::Load(const char* _SequenceFileName)
  73. in IDL:
  74. FileTypeString="StimulusSequence"
  75. VersionString='1.00'
  76. StimBytes=4
  77. header = {TypeLen: strlen(FileTypeString) $
  78. , FileType: FileTypeString $
  79. , VersionLen: strlen(VersionString) $
  80. , Version: VersionString $
  81. , StimBytes: byte(StimBytes) $
  82. , nstim: long(nstim) $
  83. , Min: long(min(stim)) $
  84. , max: long(max(stim))}
  85. """
  86. stim_bytes=4
  87. file_type_string="StimulusSequence"
  88. version_string="1.00"
  89. min_stim=min(self.nparray)
  90. max_stim=max(self.nparray)
  91. fobj = open(file_name, 'w')
  92. fobj.write(struct.pack('i',len(file_type_string)))
  93. fobj.write(file_type_string)
  94. fobj.write(struct.pack('i', len(version_string)))
  95. fobj.write(version_string)
  96. fobj.write(chr(stim_bytes))
  97. fobj.write(struct.pack('i', len(self.nparray)))
  98. fobj.write(struct.pack('i', min_stim))
  99. fobj.write(struct.pack('i', max_stim))
  100. a = array('i', self.nparray)
  101. a.tofile(fobj)
  102. fobj.close()
  103. def tolist(self):
  104. return list(self.nparray)
  105. def fromfile(self, file_name):
  106. try:
  107. self.nparray = self.from_stimulus_sequence_file(file_name)
  108. except FileTypeError:
  109. self.nparray = self.from_input_record_file(file_name)
  110. @staticmethod
  111. def from_stimulus_sequence_file(file_name):
  112. logger.debug("from_stimulus_sequence_file()")
  113. fobj = open(file_name, 'rb')
  114. file_header = StimSequenceFileHeader.from_file_object(fobj)
  115. sequence_length = file_header.sequence_length
  116. if file_header.stim_bytes == 4:
  117. stimulus_sequence = array('i')
  118. elif file_header.stim_bytes == 1:
  119. stimulus_sequence = array('B') # unsigned char
  120. else:
  121. raise FileTypeError
  122. stimulus_sequence.fromfile(fobj, sequence_length)
  123. fobj.close()
  124. return np.array(stimulus_sequence)
  125. def from_input_record_file(self, file_name):
  126. logger.debug("from_input_record_file()")
  127. input_trace = InputTrace(file_name)
  128. return input_trace.get_stim_sequence()
  129. def as_list(self):
  130. return list(self.nparray)
  131. def remove_stimulus(self, stim):
  132. self.nparray = self.nparray[self.nparray != stim]
  133. def show(self, x0=0, x1=1000):
  134. self.Nx=20
  135. self.Ny=20
  136. x1 = min([x1, len(self.nparray)])
  137. XSeq = map(lambda x: x%self.Nx, self.nparray)
  138. YSeq = map(lambda x: x/self.Nx, self.nparray)
  139. x = range(x0,x1)
  140. fig = pylab.figure()
  141. pylab.subplot(311) #first subplot
  142. pylab.plot(x, YSeq[x0:x1], '.')
  143. pylab.ylabel("Y stimulus")
  144. pylab.grid(True)
  145. pylab.subplot(312) #second subplot
  146. pylab.plot(x, XSeq[x0:x1], '+')
  147. pylab.ylabel("X stimulus")
  148. pylab.grid(True)
  149. pylab.subplot(313)
  150. pylab.plot(x, self.nparray[x0:x1], 'o')
  151. pylab.grid(True)
  152. fig = pylab.figure()
  153. pylab.subplot(211)
  154. pylab.hist(XSeq, self.Nx, normed=1, facecolor='r', alpha=0.75)
  155. pylab.title("X sequence")
  156. pylab.subplot(212)
  157. pylab.hist(YSeq, self.Ny, normed=1, facecolor='g', alpha=0.75)
  158. pylab.title("Y sequence")
  159. pylab.show()
  160. class InputTrace:
  161. """Represents a trace of stimulus numbers for each time step of the simulation.
  162. """
  163. def __init__(self, file_name=None, nparray=None):
  164. if file_name is not None:
  165. self.nparray = self.load(file_name)
  166. elif nparray is not None:
  167. self.nparray = nparray
  168. @staticmethod
  169. def load(file_name):
  170. try:
  171. nparray = StimulusSequence.from_stimulus_sequence_file(file_name)
  172. except FileTypeError:
  173. nparray = InputTrace.load_objsim_input_trace(file_name)
  174. return nparray
  175. @staticmethod
  176. def load_objsim_input_trace(file_name):
  177. stim_bytes = 4
  178. file_size = os.path.getsize(file_name)
  179. sequence_length = file_size // stim_bytes
  180. with open(file_name, 'rb') as fobj:
  181. stimulus_sequence = array('i')
  182. stimulus_sequence.fromfile(fobj, sequence_length)
  183. m = min(stimulus_sequence)
  184. logger.info("min=" + str(m))
  185. m = max(stimulus_sequence)
  186. logger.info("max=" + str(m))
  187. nparray = np.array(stimulus_sequence)
  188. # subtract one because in ObjMovieInput::proceede:
  189. # RecordBuffer[t] = CurStimNr = MovieFile->GetFrameNumber()+1;
  190. nparray -= 1 # now NO STIMULUS == -1, lowest stimulus index == 0
  191. return nparray
  192. def __len__(self):
  193. if self.nparray is not None:
  194. return len(self.nparray)
  195. else:
  196. return 0
  197. def get_copy(self):
  198. return self.nparray.copy()
  199. def get_stim_sequence(self):
  200. stim_seq = self.get_copy()
  201. diff = stim_seq[1:] - stim_seq[:-1]
  202. change = diff != 0
  203. stim_seq = stim_seq[change]
  204. stim_seq = stim_seq[stim_seq != -1]
  205. return stim_seq
  206. def get_stim_onset(self):
  207. stim_seq = self.get_copy()
  208. diff = stim_seq[1:] - stim_seq[:-1]
  209. indices = np.arange(len(diff))
  210. change = diff != 0
  211. stim_start = change * (stim_seq[1:] != -1)
  212. ind = indices[stim_start]
  213. onset_times = zip(ind+1, stim_seq[1:][stim_start])
  214. return onset_times
  215. def getGapStimFile():
  216. dirname = "/home/frank/prog/objsim/data/movies/"
  217. filename = "test_gauss20x20_nojump20_ycont_xgap.sequence"
  218. return dirname+filename
  219. def getInputRecordFile():
  220. dirname = "/home/frank/data/sim/csim/som02/seq_test/"
  221. filename = "MovieScanInput0.inp.dat"
  222. return dirname+filename
  223. def getDiagonalStimFile():
  224. dirname = "/home/frank/prog/objsim/data/movies/"
  225. filename = "diagonal.sequence"
  226. return dirname+filename
  227. def show_stim_sequence(filename, x0=0, x1=1000):
  228. seq = StimulusSequence()
  229. seq.fromfile(filename)
  230. seq.show(x0, x1)
  231. def read_input_file(file_path):
  232. stim = StimulusSequence(file_path=file_path)
  233. return stim.tolist()
  234. if __name__=='__main__':
  235. a = StimulusSequence(0)
  236. path = getGapStimFile()
  237. path = getInputRecordFile()
  238. #path = getDiagonalStimFile()
  239. a.fromfile(path)
  240. a.show(0,390)