# -*- coding: utf-8 -*- """Some classes and functions to load and save MultiViewVideo files (*.idlmov) Client code only needs class StimulusSet. Save a list of pictures as stimulus set file: >>> stim_set = StimulusSet(pics=pics) >>> stim_set.save(file_path) Load a stimulus file: >>> stimset = StimulusSet(file_path) """ import objsimpy.element_tree_object as eto import struct import numpy as np import array import logging import copy from objsimpy.xmltodict import unparse as xml_from_dict def remove_null_character(s): end = s.find('\x00') if end == -1: return '' else: return s[0:end] class StimMetaData: def __init__(self, file_path): self.eto = eto.EtObject(file_path=file_path) self.movie_file_name = self.eto.StimParas[0].MovieFileName[0].value self.ny_paras = int(self.eto.StimParas[0].NYParas[0].value) self.nx_paras = int(self.eto.StimParas[0].NXParas[0].value) def write_meta_xml(meta_dict, fname_meta): """Write stimulus meta data xml file. Args: meta_dict (dict): stimulus meta data in a dictionary fname_meta (string): name of xml file """ meta_copy = copy.deepcopy(meta_dict) make_content_value_recursive(meta_copy) with open(fname_meta, 'w') as fobj_meta: fobj_meta.write(xml_from_dict(meta_copy, pretty=True)) def make_content_value_recursive(meta_dict): keys = meta_dict.keys() for key in keys: if isinstance(meta_dict[key], dict): make_content_value_recursive(meta_dict[key]) else: meta_dict[key] = {'@value': str(meta_dict[key])} class StimFileHeader: def __init__(self, file_obj=None, info="MindVideo", version="0.05", width=1, height=1, nfilters=1): if file_obj is not None: self.from_file(file_obj) else: self.info = info self.version = version self.width = width self.height = height self.nfilters = nfilters def from_file(self, file_obj): (info,) = struct.unpack('12s', file_obj.read(12)) self.info = remove_null_character(info) (version,) = struct.unpack('8s', file_obj.read(8)) self.version = remove_null_character(version) (self.width, self.height, self.nfilters) = struct.unpack('iii', file_obj.read(12)) def save(self, file_obj): file_obj.write(struct.pack('12s', self.info)) file_obj.write(struct.pack('8s', self.version)) file_obj.write(struct.pack('iii', self.width, self.height, self.nfilters)) def __unicode__(self): attrs = ["info", "version", "width", "height", "nfilters"] ret = u"" return ret class Filter: def __init__(self, open_file=None, pic=None, fwidth=2, fheight=2, wshift=0, hshift=0, output_width=5, output_height=5, output_size=25): if open_file is not None: self.from_file(open_file) else: if pic is None: pic = np.zeros((fwidth, fheight)) self.set_filter_dimensions_from_pic(pic) self.wshift = wshift self.hshift = hshift self.output_width = output_width self.output_height = output_height def set_filter_dimensions_from_pic(self, pic): self.pic = pic self.fheight = pic.shape[0] self.fwidth = pic.shape[1] def from_file(self, file_obj): n_integers = 6 data = struct.unpack('i'*n_integers, file_obj.read(4*n_integers)) self.fwidth, self.fheight = data[:2] self.fsize = self.fwidth * self.fheight self.wshift, self.hshift = data[2:4] self.output_width, self.output_height = data[4:6] self.output_size = self.output_width * self.output_height self.pic = struct.unpack('f'*self.fsize, file_obj.read(4*self.fsize)) def save(self, file_obj): data = [self.fwidth, self.fheight, self.wshift, self.hshift, self.output_width, self.output_height] int_array = array.array('i', data) int_array.tofile(file_obj) pic_float_array = array.array('f', self.pic.flat) pic_float_array.tofile(file_obj) def __repr__(self): attrs = ["fwidth", "fheight", "wshift", "hshift", "output_width", "output_height", "output_size"] ret = "" return ret class StimulusSet: """Represents a set of images used as stimuli for neural network. A single stimulus is a frame, consisting of an image for every filter. Currently only stimulus sets with single filter are supported. Args: file_path (str, optional): if given, the stimulus set will be loaded from there pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures Attributes: pics (list of ndarray): list of images (2d numpy arrays) """ def __init__(self, file_path=None, pics=None): """Initializes an instance of StimulusSet. Args: file_path (str, optional): if given, the stimulus set will be loaded from there pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures """ self.header = None self.filters = None if pics is None: self.pics = [] else: self.pics = pics if file_path is not None: self.pics = self.from_file(file_path) def add_frame(self, pics=None): if pics is None: pics = [] if len(pics) > 1: raise NotImplementedError self.pics.append(pics[0]) def get_output_width(self, filter_num=0): if self.filters is not None: return self.filters[filter_num].output_width else: return 0 def get_output_height(self, filter_num=0): if self.filters is not None: return self.filters[filter_num].output_height else: return 0 def from_file(self, file_path): """Loads a stimulus set from file_path""" file_obj = open(file_path, "rb") self.header = StimFileHeader(file_obj) logging.debug(unicode(self.header).encode('utf8')) self.filters = [] for i in range(self.header.nfilters): f = Filter(file_obj) self.filters.append(f) logging.debug(f) if self.header.nfilters > 1: raise NotImplementedError img_filter = self.filters[0] n = img_filter.output_size pics = [] while True: n_bytes = 4 * n data = file_obj.read(n_bytes) if len(data) < n_bytes: break pic = struct.unpack('f'*n, data) pic = np.array(pic) pic = pic.reshape((img_filter.output_height, img_filter.output_width)) pics.append(pic) file_obj.close() return pics def save(self, file_path): """Saves the stimulus set to file_path.""" file_obj = open(file_path, "wb") self.save_header(file_obj) self.save_filters(file_obj) self.save_frames(file_obj) file_obj.close() # only with 1 filter implemented def save_header(self, file_obj): height, width = self.pics[0].shape if self.filters is None: self.filters = [Filter(output_width=width, output_height=height),] nfilters = len(self.filters) if nfilters > 1: raise NotImplementedError header = StimFileHeader(info="MindVideo", version="0.05", width=width, height=height, nfilters=nfilters) header.save(file_obj) def save_filters(self, file_obj): for f in self.filters: f.save(file_obj) def save_frames(self, file_obj): nfilters = len(self.filters) if nfilters > 1: raise NotImplementedError for pic in self.pics: pic_float_array = array.array('f', pic.flat) pic_float_array.tofile(file_obj) def load_stim_movie(): pass