# -*- coding: utf-8 -*- import struct from array import array import pylab import os import numpy as np import logging logger = logging.getLogger(__name__) class FileTypeError(Exception): pass class StimSequenceFileHeader(object): def __init__(self, file_type_string, file_version_string, stim_bytes, sequence_length, min_stim, max_stim, ): self.type_string = file_type_string self.version_string = file_version_string self.stim_bytes = stim_bytes self.sequence_length = sequence_length self.min_stim = min_stim self.max_stim = max_stim def __repr__(self): display_attrs = ['type_string', 'version_string', 'stim_bytes', 'sequence_length', 'min_stim', 'max_stim'] return "<{} {}\n>".format(self.__class__.__name__, ' '.join(["\n {}={}".format(p, getattr(self, p)) for p in display_attrs])) @classmethod def from_file_object(cls, fobj): (file_type_length,) = struct.unpack('i', fobj.read(4)) if file_type_length != 16: raise FileTypeError file_type_string = fobj.read(file_type_length) if file_type_string != "StimulusSequence": raise FileTypeError (file_version_length,) = struct.unpack('i', fobj.read(4)) if file_version_length != 4: raise FileTypeError file_version_string = fobj.read(file_version_length) if file_version_string != "1.00": raise FileTypeError (stim_bytes,) = struct.unpack('c', fobj.read(1)) stim_bytes = ord(stim_bytes) (sequence_length,) = struct.unpack('i', fobj.read(4)) (min_stim,) = struct.unpack('i', fobj.read(4)) (max_stim,) = struct.unpack('i', fobj.read(4)) file_header = cls(file_type_string, file_version_string, stim_bytes, sequence_length, min_stim, max_stim) logger.debug(str(file_header)) return file_header class StimulusSequence(object): """Represents a sequence of stimulus numbers for usage in a NEST or objsim simulation. """ def __init__(self, sequence=[], file_path=None): self.nparray = np.array(sequence) if file_path is not None: self.fromfile(file_path) def tofile(self, file_name): """Schreibe Sequenz-Datei. Wird eingelesen in ObjSim mit: bool StimulusSequence::Load(const char* _SequenceFileName) in IDL: FileTypeString="StimulusSequence" VersionString='1.00' StimBytes=4 header = {TypeLen: strlen(FileTypeString) $ , FileType: FileTypeString $ , VersionLen: strlen(VersionString) $ , Version: VersionString $ , StimBytes: byte(StimBytes) $ , nstim: long(nstim) $ , Min: long(min(stim)) $ , max: long(max(stim))} """ stim_bytes=4 file_type_string="StimulusSequence" version_string="1.00" min_stim=min(self.nparray) max_stim=max(self.nparray) fobj = open(file_name, 'w') fobj.write(struct.pack('i',len(file_type_string))) fobj.write(file_type_string) fobj.write(struct.pack('i', len(version_string))) fobj.write(version_string) fobj.write(chr(stim_bytes)) fobj.write(struct.pack('i', len(self.nparray))) fobj.write(struct.pack('i', min_stim)) fobj.write(struct.pack('i', max_stim)) a = array('i', self.nparray) a.tofile(fobj) fobj.close() def tolist(self): return list(self.nparray) def fromfile(self, file_name): try: self.nparray = self.from_stimulus_sequence_file(file_name) except FileTypeError: self.nparray = self.from_input_record_file(file_name) @staticmethod def from_stimulus_sequence_file(file_name): logger.debug("from_stimulus_sequence_file()") fobj = open(file_name, 'rb') file_header = StimSequenceFileHeader.from_file_object(fobj) sequence_length = file_header.sequence_length if file_header.stim_bytes == 4: stimulus_sequence = array('i') elif file_header.stim_bytes == 1: stimulus_sequence = array('B') # unsigned char else: raise FileTypeError stimulus_sequence.fromfile(fobj, sequence_length) fobj.close() return np.array(stimulus_sequence) def from_input_record_file(self, file_name): logger.debug("from_input_record_file()") input_trace = InputTrace(file_name) return input_trace.get_stim_sequence() def as_list(self): return list(self.nparray) def remove_stimulus(self, stim): self.nparray = self.nparray[self.nparray != stim] def show(self, x0=0, x1=1000): self.Nx=20 self.Ny=20 x1 = min([x1, len(self.nparray)]) XSeq = map(lambda x: x%self.Nx, self.nparray) YSeq = map(lambda x: x/self.Nx, self.nparray) x = range(x0,x1) fig = pylab.figure() pylab.subplot(311) #first subplot pylab.plot(x, YSeq[x0:x1], '.') pylab.ylabel("Y stimulus") pylab.grid(True) pylab.subplot(312) #second subplot pylab.plot(x, XSeq[x0:x1], '+') pylab.ylabel("X stimulus") pylab.grid(True) pylab.subplot(313) pylab.plot(x, self.nparray[x0:x1], 'o') pylab.grid(True) fig = pylab.figure() pylab.subplot(211) pylab.hist(XSeq, self.Nx, normed=1, facecolor='r', alpha=0.75) pylab.title("X sequence") pylab.subplot(212) pylab.hist(YSeq, self.Ny, normed=1, facecolor='g', alpha=0.75) pylab.title("Y sequence") pylab.show() class InputTrace: """Represents a trace of stimulus numbers for each time step of the simulation. """ def __init__(self, file_name=None, nparray=None): if file_name is not None: self.nparray = self.load(file_name) elif nparray is not None: self.nparray = nparray @staticmethod def load(file_name): try: nparray = StimulusSequence.from_stimulus_sequence_file(file_name) except FileTypeError: nparray = InputTrace.load_objsim_input_trace(file_name) return nparray @staticmethod def load_objsim_input_trace(file_name): stim_bytes = 4 file_size = os.path.getsize(file_name) sequence_length = file_size // stim_bytes with open(file_name, 'rb') as fobj: stimulus_sequence = array('i') stimulus_sequence.fromfile(fobj, sequence_length) m = min(stimulus_sequence) logger.info("min=" + str(m)) m = max(stimulus_sequence) logger.info("max=" + str(m)) nparray = np.array(stimulus_sequence) # subtract one because in ObjMovieInput::proceede: # RecordBuffer[t] = CurStimNr = MovieFile->GetFrameNumber()+1; nparray -= 1 # now NO STIMULUS == -1, lowest stimulus index == 0 return nparray def __len__(self): if self.nparray is not None: return len(self.nparray) else: return 0 def get_copy(self): return self.nparray.copy() def get_stim_sequence(self): stim_seq = self.get_copy() diff = stim_seq[1:] - stim_seq[:-1] change = diff != 0 stim_seq = stim_seq[change] stim_seq = stim_seq[stim_seq != -1] return stim_seq def get_stim_onset(self): stim_seq = self.get_copy() diff = stim_seq[1:] - stim_seq[:-1] indices = np.arange(len(diff)) change = diff != 0 stim_start = change * (stim_seq[1:] != -1) ind = indices[stim_start] onset_times = zip(ind+1, stim_seq[1:][stim_start]) return onset_times def getGapStimFile(): dirname = "/home/frank/prog/objsim/data/movies/" filename = "test_gauss20x20_nojump20_ycont_xgap.sequence" return dirname+filename def getInputRecordFile(): dirname = "/home/frank/data/sim/csim/som02/seq_test/" filename = "MovieScanInput0.inp.dat" return dirname+filename def getDiagonalStimFile(): dirname = "/home/frank/prog/objsim/data/movies/" filename = "diagonal.sequence" return dirname+filename def show_stim_sequence(filename, x0=0, x1=1000): seq = StimulusSequence() seq.fromfile(filename) seq.show(x0, x1) def read_input_file(file_path): stim = StimulusSequence(file_path=file_path) return stim.tolist() if __name__=='__main__': a = StimulusSequence(0) path = getGapStimFile() path = getInputRecordFile() #path = getDiagonalStimFile() a.fromfile(path) a.show(0,390)