# -*- coding: utf-8 -*-
"""Some classes and functions to load and save MultiViewVideo files (*.idlmov)
Client code only needs class StimulusSet.
Save a list of pictures as stimulus set file:
>>> stim_set = StimulusSet(pics=pics)
>>> stim_set.save(file_path)
Load a stimulus file:
>>> stimset = StimulusSet(file_path)
"""
import objsimpy.element_tree_object as eto
import struct
import numpy as np
import array
import logging
import copy
from objsimpy.xmltodict import unparse as xml_from_dict
def remove_null_character(s):
end = s.find('\x00')
if end == -1:
return ''
else:
return s[0:end]
class StimMetaData:
def __init__(self, file_path):
self.eto = eto.EtObject(file_path=file_path)
self.movie_file_name = self.eto.StimParas[0].MovieFileName[0].value
self.ny_paras = int(self.eto.StimParas[0].NYParas[0].value)
self.nx_paras = int(self.eto.StimParas[0].NXParas[0].value)
def write_meta_xml(meta_dict, fname_meta):
"""Write stimulus meta data xml file.
Args:
meta_dict (dict): stimulus meta data in a dictionary
fname_meta (string): name of xml file
"""
meta_copy = copy.deepcopy(meta_dict)
make_content_value_recursive(meta_copy)
with open(fname_meta, 'w') as fobj_meta:
fobj_meta.write(xml_from_dict(meta_copy, pretty=True))
def make_content_value_recursive(meta_dict):
keys = meta_dict.keys()
for key in keys:
if isinstance(meta_dict[key], dict):
make_content_value_recursive(meta_dict[key])
else:
meta_dict[key] = {'@value': str(meta_dict[key])}
class StimFileHeader:
def __init__(self, file_obj=None, info="MindVideo", version="0.05", width=1, height=1, nfilters=1):
if file_obj is not None:
self.from_file(file_obj)
else:
self.info = info
self.version = version
self.width = width
self.height = height
self.nfilters = nfilters
def from_file(self, file_obj):
(info,) = struct.unpack('12s', file_obj.read(12))
self.info = remove_null_character(info)
(version,) = struct.unpack('8s', file_obj.read(8))
self.version = remove_null_character(version)
(self.width, self.height, self.nfilters) = struct.unpack('iii', file_obj.read(12))
def save(self, file_obj):
file_obj.write(struct.pack('12s', self.info))
file_obj.write(struct.pack('8s', self.version))
file_obj.write(struct.pack('iii', self.width, self.height, self.nfilters))
def __unicode__(self):
attrs = ["info", "version", "width", "height", "nfilters"]
ret = u""
return ret
class Filter:
def __init__(self, open_file=None, pic=None, fwidth=2, fheight=2, wshift=0, hshift=0,
output_width=5, output_height=5, output_size=25):
if open_file is not None:
self.from_file(open_file)
else:
if pic is None:
pic = np.zeros((fwidth, fheight))
self.set_filter_dimensions_from_pic(pic)
self.wshift = wshift
self.hshift = hshift
self.output_width = output_width
self.output_height = output_height
def set_filter_dimensions_from_pic(self, pic):
self.pic = pic
self.fheight = pic.shape[0]
self.fwidth = pic.shape[1]
def from_file(self, file_obj):
n_integers = 6
data = struct.unpack('i'*n_integers, file_obj.read(4*n_integers))
self.fwidth, self.fheight = data[:2]
self.fsize = self.fwidth * self.fheight
self.wshift, self.hshift = data[2:4]
self.output_width, self.output_height = data[4:6]
self.output_size = self.output_width * self.output_height
self.pic = struct.unpack('f'*self.fsize, file_obj.read(4*self.fsize))
def save(self, file_obj):
data = [self.fwidth, self.fheight, self.wshift, self.hshift, self.output_width, self.output_height]
int_array = array.array('i', data)
int_array.tofile(file_obj)
pic_float_array = array.array('f', self.pic.flat)
pic_float_array.tofile(file_obj)
def __repr__(self):
attrs = ["fwidth", "fheight", "wshift", "hshift", "output_width", "output_height", "output_size"]
ret = ""
return ret
class StimulusSet:
"""Represents a set of images used as stimuli for neural network.
A single stimulus is a frame, consisting of an image for every filter.
Currently only stimulus sets with single filter are supported.
Args:
file_path (str, optional): if given, the stimulus set will be loaded from there
pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures
Attributes:
pics (list of ndarray): list of images (2d numpy arrays)
"""
def __init__(self, file_path=None, pics=None):
"""Initializes an instance of StimulusSet.
Args:
file_path (str, optional): if given, the stimulus set will be loaded from there
pics (list of ndarray, optional): List of 2d numpy arrays representing stimulus pictures
"""
self.header = None
self.filters = None
if pics is None:
self.pics = []
else:
self.pics = pics
if file_path is not None:
self.pics = self.from_file(file_path)
def add_frame(self, pics=None):
if pics is None:
pics = []
if len(pics) > 1:
raise NotImplementedError
self.pics.append(pics[0])
def get_output_width(self, filter_num=0):
if self.filters is not None:
return self.filters[filter_num].output_width
else:
return 0
def get_output_height(self, filter_num=0):
if self.filters is not None:
return self.filters[filter_num].output_height
else:
return 0
def from_file(self, file_path):
"""Loads a stimulus set from file_path"""
file_obj = open(file_path, "rb")
self.header = StimFileHeader(file_obj)
logging.debug(unicode(self.header).encode('utf8'))
self.filters = []
for i in range(self.header.nfilters):
f = Filter(file_obj)
self.filters.append(f)
logging.debug(f)
if self.header.nfilters > 1:
raise NotImplementedError
img_filter = self.filters[0]
n = img_filter.output_size
pics = []
while True:
n_bytes = 4 * n
data = file_obj.read(n_bytes)
if len(data) < n_bytes:
break
pic = struct.unpack('f'*n, data)
pic = np.array(pic)
pic = pic.reshape((img_filter.output_height, img_filter.output_width))
pics.append(pic)
file_obj.close()
return pics
def save(self, file_path):
"""Saves the stimulus set to file_path."""
file_obj = open(file_path, "wb")
self.save_header(file_obj)
self.save_filters(file_obj)
self.save_frames(file_obj)
file_obj.close()
# only with 1 filter implemented
def save_header(self, file_obj):
height, width = self.pics[0].shape
if self.filters is None:
self.filters = [Filter(output_width=width, output_height=height),]
nfilters = len(self.filters)
if nfilters > 1:
raise NotImplementedError
header = StimFileHeader(info="MindVideo", version="0.05", width=width, height=height, nfilters=nfilters)
header.save(file_obj)
def save_filters(self, file_obj):
for f in self.filters:
f.save(file_obj)
def save_frames(self, file_obj):
nfilters = len(self.filters)
if nfilters > 1:
raise NotImplementedError
for pic in self.pics:
pic_float_array = array.array('f', pic.flat)
pic_float_array.tofile(file_obj)
def load_stim_movie():
pass